mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-21 10:22:18 +00:00
fix(relay): make hosted gateways actually connect AND complete the inbound/outbound round-trip (#48828)
* fix(relay): enable RELAY platform + normalize dial URL so hosted gateways actually connect Three bugs blocked a self-provisioned hosted gateway from ever establishing its inbound relay WS (found while standing up the live staging end-to-end). Each masked the next; all three are needed for inbound to work. 1. RELAY platform never enabled in config.platforms (gateway/config.py). register_relay_adapter() puts the adapter in the platform_registry, but start_gateway()'s connect loop iterates self.config.platforms — which never contained Platform.RELAY. So the adapter was "registered" but never connected (logs showed "relay adapter registered" then "No messaging platforms enabled"). Fix: _apply_env_overrides now enables Platform.RELAY (mirroring relay_url into extra for the connected-checker) when GATEWAY_RELAY_URL (env) or gateway.relay_url (yaml) is set. Absent -> no RELAY entry (direct/ single-tenant gateways unaffected). 2. URL scheme not converted for the WS dial (gateway/relay/ws_transport.py). The relay URL is configured once as the http(s):// base (used as-is for the provision POST), but websockets.connect rejects http(s):// with "scheme isn't ws or wss". Fix: _ws_dial_url converts https->wss / http->ws. 3. /relay path not appended (same helper). The connector mounts its WebSocketServer at path "/relay" and returns HTTP 400 on an upgrade to any other path. GATEWAY_RELAY_URL is the base (no /relay), so the dial hit "/" -> 400. Fix: _ws_dial_url ensures the path ends in /relay. Idempotent — a URL already carrying ws(s):// and/or /relay is unchanged, so provision's _provision_url (which derives /relay/provision from either form) still works. Why the cross-repo E2E missed #2/#3: the stub connector binds ws://host:port and its websockets.serve accepts ANY path, so neither the scheme nor the /relay path was exercised. Real connector needs both. Verified live on staging hermes-agent-stg-automated-perception-5054: after the fixes the gateway logs "Connecting to relay..." -> "✓ relay connected" -> "Gateway running with 1 platform(s)" against wss://gateway-gateway.staging-nousresearch.com/relay, stable. Tests: added _ws_dial_url scheme+path+idempotency cases (test_ws_transport.py) and RELAY-platform-enablement cases for env + yaml + absent (test_config.py). Full gateway/relay + config suites green (191 passed). Relay-adapter lane. EXPERIMENTAL. * fix(relay): re-attach guild_id to outbound so connector egress resolves the tenant The final bug in the hosted-relay round-trip. Inbound worked end to end (Discord -> connector -> bus -> agent WS -> agent runs -> reply), but the reply's egress was declined by the connector: "discord egress declined: target not routed to an onboarded tenant". Cause: the connector's routedEgressGuard resolves the owning tenant from the OUTBOUND action's metadata.guild_id (Discord's routing discriminator). The gateway's generic delivery path builds outbound metadata via run.py _thread_metadata_for_source, which only carries thread_id (and returns None entirely for a non-threaded message) — so guild_id never reached the connector, tenant resolution failed, and the shared bot refused to post. Fix (relay-adapter-local, no perturbation of the generic delivery path or other platforms): RelayAdapter learns chat_id -> guild_id from each inbound event (_capture_scope) and re-attaches it to the outbound action's metadata in send() (_with_scope) when not already present. No-op for chats we never saw inbound (e.g. DMs) and never overwrites an explicit guild_id. Verified live on staging hermes-agent-stg-automated-perception-5054: an @mention in #general now produces a visible bot reply — full multi-tenant relay round-trip (real Discord -> shared connector bot -> tenant routing -> agent WS -> reply egress -> Discord). Tests: _capture_scope/_with_scope reattach, no-scope no-op, explicit-guild_id preserved (test_relay_adapter.py). Full relay + config suites green (160 passed). Relay-adapter lane. EXPERIMENTAL.
This commit is contained in:
parent
245b95b094
commit
a64fc490fe
6 changed files with 220 additions and 2 deletions
|
|
@ -2143,5 +2143,24 @@ def _apply_env_overrides(config: GatewayConfig) -> None:
|
|||
except Exception as e:
|
||||
logger.debug("Plugin platform enable pass failed: %s", e)
|
||||
|
||||
# Relay (generic connector-fronted platform, EXPERIMENTAL). Enabled when a
|
||||
# connector relay URL is configured via GATEWAY_RELAY_URL (env) or
|
||||
# gateway.relay_url (config.yaml). The adapter is registered into the
|
||||
# platform_registry at gateway startup (gateway.relay.register_relay_adapter)
|
||||
# and dials OUT to the connector — so, like Telegram/Matrix, it has no public
|
||||
# inbound port and just needs Platform.RELAY present+enabled in
|
||||
# config.platforms for start_gateway()'s connect loop to bring it up. The
|
||||
# connected-checker (Platform.RELAY in _PLATFORM_CONNECTED_CHECKERS) keys on
|
||||
# extra["relay_url"], so mirror the URL into extra here.
|
||||
relay_url_env = os.getenv("GATEWAY_RELAY_URL", "").strip()
|
||||
relay_url_yaml = ""
|
||||
existing_relay = config.platforms.get(Platform.RELAY)
|
||||
if existing_relay is not None:
|
||||
relay_url_yaml = str(existing_relay.extra.get("relay_url") or "").strip()
|
||||
relay_url_val = relay_url_env or relay_url_yaml
|
||||
if relay_url_val:
|
||||
relay_config = _enable_from_env(Platform.RELAY)
|
||||
relay_config.extra["relay_url"] = relay_url_val.rstrip("/")
|
||||
|
||||
for platform_config in config.platforms.values():
|
||||
platform_config.extra.pop("_enabled_explicit", None)
|
||||
|
|
|
|||
|
|
@ -57,6 +57,13 @@ class RelayAdapter(BasePlatformAdapter):
|
|||
self._transport = transport
|
||||
# Capability surface read by stream_consumer (getattr(..., 4096)).
|
||||
self.MAX_MESSAGE_LENGTH = descriptor.max_message_length
|
||||
# chat_id -> guild_id (Discord) / workspace scope, learned from inbound
|
||||
# events. The connector's egress guard resolves the owning tenant from
|
||||
# the OUTBOUND action's metadata.guild_id; the gateway's generic delivery
|
||||
# path (run.py _thread_metadata_for_source) only carries thread_id, so we
|
||||
# re-attach the scope here from what we saw inbound. Keyed by chat_id
|
||||
# (channel) since that's what send() receives. See routedEgressGuard.ts.
|
||||
self._scope_by_chat: Dict[str, str] = {}
|
||||
self.supports_code_blocks = descriptor.markdown_dialect not in ("", "plain")
|
||||
|
||||
# ── capability surface (from descriptor) ─────────────────────────────
|
||||
|
|
@ -108,8 +115,35 @@ class RelayAdapter(BasePlatformAdapter):
|
|||
|
||||
async def _on_inbound(self, event) -> None:
|
||||
"""Bridge a connector-delivered MessageEvent into the normal adapter path."""
|
||||
self._capture_scope(event)
|
||||
await self.handle_message(event)
|
||||
|
||||
def _capture_scope(self, event) -> None:
|
||||
"""Remember chat_id -> guild scope from an inbound event so our outbound
|
||||
(the agent's reply) can re-assert it for the connector's egress tenant
|
||||
resolution. Never raises — scope tracking must not break inbound."""
|
||||
try:
|
||||
src = getattr(event, "source", None)
|
||||
scope = getattr(src, "guild_id", None) if src else None
|
||||
chat = getattr(src, "chat_id", None) if src else None
|
||||
if scope and chat:
|
||||
self._scope_by_chat[str(chat)] = str(scope)
|
||||
except Exception: # noqa: BLE001 - scope tracking must never break inbound
|
||||
pass
|
||||
|
||||
def _with_scope(self, chat_id: str, metadata: Optional[Dict[str, Any]]) -> Dict[str, Any]:
|
||||
"""Ensure the outbound metadata carries guild_id for the connector's
|
||||
egress tenant resolution. The connector resolves the owning tenant from
|
||||
metadata.guild_id (Discord); without it egress is declined as
|
||||
'target not routed to an onboarded tenant'. No-op when we have no scope
|
||||
for this chat (e.g. DMs) or it's already present."""
|
||||
meta: Dict[str, Any] = dict(metadata or {})
|
||||
if not meta.get("guild_id"):
|
||||
scope = self._scope_by_chat.get(str(chat_id))
|
||||
if scope:
|
||||
meta["guild_id"] = scope
|
||||
return meta
|
||||
|
||||
async def on_interrupt(self, session_key: str, chat_id: str) -> None:
|
||||
"""Bridge a connector-delivered /stop into the adapter's interrupt path.
|
||||
|
||||
|
|
@ -140,7 +174,7 @@ class RelayAdapter(BasePlatformAdapter):
|
|||
"chat_id": chat_id,
|
||||
"content": content,
|
||||
"reply_to": reply_to,
|
||||
"metadata": metadata or {},
|
||||
"metadata": self._with_scope(chat_id, metadata),
|
||||
}
|
||||
)
|
||||
return SendResult(
|
||||
|
|
|
|||
|
|
@ -54,6 +54,35 @@ _HANDSHAKE_TIMEOUT_S = 30.0
|
|||
_OUTBOUND_TIMEOUT_S = 30.0
|
||||
|
||||
|
||||
def _ws_dial_url(url: str) -> str:
|
||||
"""Normalize a connector URL to the ``ws(s)://…/relay`` dial target.
|
||||
|
||||
The relay URL is configured once (``GATEWAY_RELAY_URL`` / ``gateway.relay_url``)
|
||||
as the connector's BASE URL (e.g. ``https://connector.example``) and shared by
|
||||
both the provision POST (which needs ``http(s)://…/relay/provision`` — see
|
||||
``_provision_url``) and the WS dial (which needs ``ws(s)://…/relay``, the path
|
||||
the connector mounts its ``WebSocketServer`` on). Two normalizations, both
|
||||
load-bearing:
|
||||
|
||||
- scheme: ``https -> wss``, ``http -> ws`` (``websockets.connect`` raises
|
||||
"scheme isn't ws or wss" on an http(s) URL).
|
||||
- path: ensure it ends in ``/relay`` (the connector returns HTTP 400 on an
|
||||
upgrade to any other path, since the WS server is mounted at ``/relay``).
|
||||
|
||||
Idempotent: an already-``ws(s)://…/relay`` URL is returned unchanged, so a URL
|
||||
configured WITH the scheme and/or ``/relay`` still works.
|
||||
"""
|
||||
raw = (url or "").strip()
|
||||
if raw.startswith("https://"):
|
||||
raw = "wss://" + raw[len("https://"):]
|
||||
elif raw.startswith("http://"):
|
||||
raw = "ws://" + raw[len("http://"):]
|
||||
raw = raw.rstrip("/")
|
||||
if not raw.endswith("/relay"):
|
||||
raw = f"{raw}/relay"
|
||||
return raw
|
||||
|
||||
|
||||
def _event_from_wire(raw: Dict[str, Any]) -> MessageEvent:
|
||||
"""Rebuild a MessageEvent from the connector's normalized inbound payload.
|
||||
|
||||
|
|
@ -118,7 +147,7 @@ class WebSocketRelayTransport:
|
|||
"WebSocketRelayTransport requires the 'websockets' package "
|
||||
"(install the messaging extra)."
|
||||
)
|
||||
self._url = url
|
||||
self._url = _ws_dial_url(url)
|
||||
self._platform = platform
|
||||
self._bot_id = bot_id
|
||||
self._connect_timeout_s = connect_timeout_s
|
||||
|
|
|
|||
|
|
@ -75,3 +75,68 @@ async def test_send_without_transport_returns_failure():
|
|||
result = await a.send("chat1", "hello")
|
||||
assert result.success is False
|
||||
assert result.error == "no transport"
|
||||
|
||||
|
||||
class _CaptureTransport:
|
||||
"""Minimal RelayTransport stand-in that records the outbound action."""
|
||||
|
||||
def __init__(self):
|
||||
self.sent = None
|
||||
|
||||
def set_inbound_handler(self, h): # noqa: D401
|
||||
self._h = h
|
||||
|
||||
async def send_outbound(self, action):
|
||||
self.sent = action
|
||||
return {"success": True, "message_id": "m1"}
|
||||
|
||||
|
||||
def _make_event(chat_id="chan-1", guild_id="guild-9"):
|
||||
from gateway.platforms.base import MessageEvent, MessageType
|
||||
from gateway.session import SessionSource
|
||||
|
||||
src = SessionSource(
|
||||
platform=Platform.RELAY,
|
||||
chat_id=chat_id,
|
||||
chat_type="channel",
|
||||
guild_id=guild_id,
|
||||
)
|
||||
return MessageEvent(text="hi", source=src, message_type=MessageType.TEXT)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_reattaches_guild_id_from_inbound_scope():
|
||||
"""The connector's egress guard resolves the owning tenant from
|
||||
metadata.guild_id; the gateway's generic delivery path drops it, so the
|
||||
relay adapter must re-attach the guild scope learned from the inbound event.
|
||||
Regression for live 'discord egress declined: target not routed to an
|
||||
onboarded tenant'."""
|
||||
t = _CaptureTransport()
|
||||
a = RelayAdapter(PlatformConfig(), make_desc(platform="discord"), transport=t)
|
||||
# Simulate the connector delivering an inbound message in guild-9 / chan-1,
|
||||
# but don't run the full handle_message pipeline — just the scope capture.
|
||||
a._capture_scope(_make_event(chat_id="chan-1", guild_id="guild-9"))
|
||||
|
||||
await a.send("chan-1", "the reply")
|
||||
|
||||
assert t.sent["metadata"].get("guild_id") == "guild-9"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_without_known_scope_omits_guild_id():
|
||||
"""A chat we never saw inbound (e.g. a DM) gets no guild_id — no-op, never
|
||||
invents a scope."""
|
||||
t = _CaptureTransport()
|
||||
a = RelayAdapter(PlatformConfig(), make_desc(platform="discord"), transport=t)
|
||||
await a.send("unknown-chat", "hi")
|
||||
assert "guild_id" not in t.sent["metadata"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_preserves_explicit_guild_id():
|
||||
"""An explicitly-provided metadata.guild_id is never overwritten."""
|
||||
t = _CaptureTransport()
|
||||
a = RelayAdapter(PlatformConfig(), make_desc(platform="discord"), transport=t)
|
||||
a._capture_scope(_make_event(chat_id="chan-1", guild_id="guild-9"))
|
||||
await a.send("chan-1", "hi", metadata={"guild_id": "explicit-1"})
|
||||
assert t.sent["metadata"]["guild_id"] == "explicit-1"
|
||||
|
|
|
|||
|
|
@ -177,3 +177,25 @@ async def test_disconnect_fails_pending_waiters_cleanly(server):
|
|||
# After disconnect, an outbound returns a structured failure rather than hanging.
|
||||
result = await t.send_outbound({"op": "send", "chat_id": "c", "content": "x"})
|
||||
assert result["success"] is False
|
||||
|
||||
|
||||
def test_https_url_normalized_to_wss():
|
||||
"""The relay URL is configured once as the http(s):// BASE (for the provision
|
||||
POST), but websockets.connect needs ws(s):// and the connector mounts its WS
|
||||
server at /relay. The transport must convert scheme AND ensure the /relay
|
||||
path. Regression for the live staging failures 'scheme isn't ws or wss' then
|
||||
'server rejected WebSocket connection: HTTP 400' (wrong path)."""
|
||||
t = WebSocketRelayTransport("https://connector.example", "discord", "b")
|
||||
assert t._url == "wss://connector.example/relay"
|
||||
t2 = WebSocketRelayTransport("http://connector.local:8080", "discord", "b")
|
||||
assert t2._url == "ws://connector.local:8080/relay"
|
||||
|
||||
|
||||
def test_ws_dial_url_idempotent_with_scheme_and_path():
|
||||
# Already ws(s):// and/or already ending in /relay -> unchanged (no double append).
|
||||
t = WebSocketRelayTransport("wss://connector.example/relay", "discord", "b")
|
||||
assert t._url == "wss://connector.example/relay"
|
||||
t2 = WebSocketRelayTransport("https://connector.example/relay/", "discord", "b")
|
||||
assert t2._url == "wss://connector.example/relay"
|
||||
t3 = WebSocketRelayTransport("ws://127.0.0.1:9", "discord", "b")
|
||||
assert t3._url == "ws://127.0.0.1:9/relay"
|
||||
|
|
|
|||
|
|
@ -311,6 +311,55 @@ class TestLoadGatewayConfig:
|
|||
|
||||
assert config.quick_commands == {"limits": {"type": "exec", "command": "echo ok"}}
|
||||
|
||||
def test_relay_platform_enabled_from_env_url(self, tmp_path, monkeypatch):
|
||||
"""GATEWAY_RELAY_URL must enable Platform.RELAY in config.platforms so
|
||||
start_gateway()'s connect loop actually dials the connector. Registering
|
||||
the adapter in the platform_registry is NOT enough — the connect loop
|
||||
iterates config.platforms, so an un-enabled RELAY never connects (the
|
||||
'relay registered but no inbound' bug)."""
|
||||
hermes_home = tmp_path / ".hermes"
|
||||
hermes_home.mkdir()
|
||||
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
||||
monkeypatch.setenv("GATEWAY_RELAY_URL", "https://connector.example/relay/")
|
||||
|
||||
config = load_gateway_config()
|
||||
|
||||
assert Platform.RELAY in config.platforms
|
||||
relay = config.platforms[Platform.RELAY]
|
||||
assert relay.enabled is True
|
||||
# Trailing slash stripped; mirrored into extra for the connected-checker.
|
||||
assert relay.extra.get("relay_url") == "https://connector.example/relay"
|
||||
assert Platform.RELAY in config.get_connected_platforms()
|
||||
|
||||
def test_relay_platform_absent_when_url_unset(self, tmp_path, monkeypatch):
|
||||
"""No relay URL -> no RELAY platform, so direct/single-tenant gateways
|
||||
are unaffected."""
|
||||
hermes_home = tmp_path / ".hermes"
|
||||
hermes_home.mkdir()
|
||||
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
||||
monkeypatch.delenv("GATEWAY_RELAY_URL", raising=False)
|
||||
|
||||
config = load_gateway_config()
|
||||
|
||||
assert Platform.RELAY not in config.platforms
|
||||
|
||||
def test_relay_platform_enabled_from_config_yaml(self, tmp_path, monkeypatch):
|
||||
"""gateway.relay_url in config.yaml also enables RELAY (env-less path)."""
|
||||
hermes_home = tmp_path / ".hermes"
|
||||
hermes_home.mkdir()
|
||||
config_path = hermes_home / "config.yaml"
|
||||
config_path.write_text(
|
||||
"gateway:\n platforms:\n relay:\n extra:\n relay_url: https://connector.example/relay\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
||||
monkeypatch.delenv("GATEWAY_RELAY_URL", raising=False)
|
||||
|
||||
config = load_gateway_config()
|
||||
|
||||
assert Platform.RELAY in config.platforms
|
||||
assert config.platforms[Platform.RELAY].enabled is True
|
||||
|
||||
def test_bridges_group_sessions_per_user_from_config_yaml(self, tmp_path, monkeypatch):
|
||||
hermes_home = tmp_path / ".hermes"
|
||||
hermes_home.mkdir()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue