From f35abb122afb47efdf9ed1f0d46b7c06eab56df4 Mon Sep 17 00:00:00 2001 From: Ben Barclay Date: Thu, 18 Jun 2026 15:56:13 +1000 Subject: [PATCH] =?UTF-8?q?feat(gateway):=20multiplex=20phase=201=20?= =?UTF-8?q?=E2=80=94=20HTTP-inbound=20/p//=20routing=20(webhook)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Serve webhook inbound for multiple profiles off the one shared listener via a URL prefix, with no second port bound. - SessionSource gains a 'profile' field (round-trips through to_dict/from_dict; omitted when unset so existing serialization is unchanged). It carries which profile an inbound message was routed to. - WebhookAdapter registers /p/{profile}/webhooks/{route_name} alongside the existing /webhooks/{route_name}. _resolve_request_profile validates the prefix against profiles_to_serve(): None when absent or multiplexing is off (ignored, handled as default — no spurious 404), the profile name when valid, _PROFILE_REJECTED (→ 404) when the profile isn't served. The resolved profile is stamped onto the SessionSource. - session-key namespacing and the per-turn home/credential scope now prefer source.profile: SessionStore._resolve_profile_for_key(source), _session_key_for_source fallback, and _resolve_profile_home_for_source all honor it (→ the agent turn resolves that profile's config/skills/credentials via the Phase 2 _profile_runtime_scope). Constraint: routing inbound needs no per-profile platform credential, but the agent still needs the routed profile's provider key — delivered by Phase 2's secret scope. api_server (OpenAI-compatible surface) profile routing is a focused follow-on; its source-construction path differs from webhook's. Tests: SessionSource.profile round-trip + namespace drive; _resolve_request_ profile accept/reject/ignore matrix. --- gateway/platforms/webhook.py | 51 ++++++++++++++ gateway/run.py | 22 +++--- gateway/session.py | 25 ++++--- tests/gateway/test_multiplex_http_routing.py | 73 ++++++++++++++++++++ 4 files changed, 153 insertions(+), 18 deletions(-) create mode 100644 tests/gateway/test_multiplex_http_routing.py diff --git a/gateway/platforms/webhook.py b/gateway/platforms/webhook.py index 222adf4c2ea..d9f98282a8d 100644 --- a/gateway/platforms/webhook.py +++ b/gateway/platforms/webhook.py @@ -57,6 +57,11 @@ from gateway.platforms.base import ( logger = logging.getLogger(__name__) +# Sentinel returned by _resolve_request_profile when a /p// prefix +# names a profile this gateway does not serve (→ 404). Distinct from None +# (no prefix / multiplexing off → handle as the default profile). +_PROFILE_REJECTED = object() + _BUILTIN_DELIVER_PLATFORMS = { "telegram", "discord", "slack", "signal", "sms", "whatsapp", "matrix", "mattermost", "homeassistant", "email", "dingtalk", @@ -189,6 +194,14 @@ class WebhookAdapter(BasePlatformAdapter): app = web.Application() app.router.add_get("/health", self._handle_health) app.router.add_post("/webhooks/{route_name}", self._handle_webhook) + # Multi-profile multiplexing: a /p//webhooks/ prefix + # routes the inbound event to that profile. Same handler; the profile is + # captured from the path and stamped onto the SessionSource so the agent + # turn resolves that profile's config/skills/credentials. Only honored + # when gateway.multiplex_profiles is on (the handler validates). + app.router.add_post( + "/p/{profile}/webhooks/{route_name}", self._handle_webhook + ) # Port conflict detection — fail fast if port is already in use import socket as _socket @@ -397,6 +410,35 @@ class WebhookAdapter(BasePlatformAdapter): except Exception as e: logger.error("[webhook] Failed to reload dynamic routes: %s", e) + def _resolve_request_profile(self, request: "web.Request"): + """Resolve + validate the /p// URL prefix on a webhook request. + + Returns: + - ``None`` when no profile prefix is present, or multiplexing is off + (the prefix is ignored, request handled as the default profile). + - the profile name (str) when present, multiplexing is on, and the + profile is one this gateway serves. + - ``_PROFILE_REJECTED`` when a prefix is present but the profile is + unknown/unconfigured (handler returns 404). + """ + profile = (request.match_info.get("profile") or "").strip() + if not profile: + return None + runner = self.gateway_runner + cfg = getattr(runner, "config", None) + if not getattr(cfg, "multiplex_profiles", False): + # Prefix supplied but multiplexing is off — ignore it, behave as + # the single-profile gateway (don't 404 a would-be valid route). + return None + try: + from hermes_cli.profiles import profiles_to_serve + served = {name for name, _ in profiles_to_serve(multiplex=True)} + except Exception: + return _PROFILE_REJECTED + if profile not in served: + return _PROFILE_REJECTED + return profile + async def _handle_webhook(self, request: "web.Request") -> "web.Response": """POST /webhooks/{route_name} — receive and process a webhook event.""" # Hot-reload dynamic subscriptions on each request (mtime-gated, cheap) @@ -405,6 +447,13 @@ class WebhookAdapter(BasePlatformAdapter): route_name = request.match_info.get("route_name", "") route_config = self._routes.get(route_name) + # Multi-profile: resolve + validate the /p// prefix if present. + profile = self._resolve_request_profile(request) + if profile is _PROFILE_REJECTED: + return web.json_response( + {"error": "Unknown or unconfigured profile"}, status=404 + ) + if not route_config: return web.json_response( {"error": f"Unknown route: {route_name}"}, status=404 @@ -641,6 +690,8 @@ class WebhookAdapter(BasePlatformAdapter): user_id=f"webhook:{route_name}", user_name=route_name, ) + if profile and isinstance(profile, str): + source.profile = profile event = MessageEvent( text=prompt, message_type=MessageType.TEXT, diff --git a/gateway/run.py b/gateway/run.py index d0b27680ae9..4d3e22c412c 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -2884,11 +2884,14 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew # agent:main) unless multiplexing is on, then the active profile. _profile = None if getattr(config, "multiplex_profiles", False): - try: - from hermes_cli.profiles import get_active_profile_name - _profile = get_active_profile_name() or "default" - except Exception: - _profile = None + if source.profile: + _profile = source.profile + else: + try: + from hermes_cli.profiles import get_active_profile_name + _profile = get_active_profile_name() or "default" + except Exception: + _profile = None return build_session_key( source, group_sessions_per_user=getattr(config, "group_sessions_per_user", True), @@ -13902,14 +13905,13 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew def _resolve_profile_home_for_source(self, source: SessionSource) -> "Path": """Resolve which profile's HERMES_HOME should serve this inbound source. - Phase 2 baseline: the active profile (the multiplexer's own home). Phase - 1/3 wire real per-source attribution (URL prefix, per-credential adapter - ownership) by overriding the resolved profile on the source/adapter; this - method is the single point they hook. + Prefers the profile the source was routed to (``source.profile`` — set + by the /p// URL prefix or a per-credential adapter), falling + back to the active profile (the multiplexer's own home). """ from hermes_cli.profiles import get_active_profile_name, get_profile_dir try: - name = get_active_profile_name() or "default" + name = (source.profile or "").strip() or get_active_profile_name() or "default" return get_profile_dir(name) except Exception: from hermes_constants import get_hermes_home diff --git a/gateway/session.py b/gateway/session.py index 83b5ba5a812..d07c65ec29f 100644 --- a/gateway/session.py +++ b/gateway/session.py @@ -92,6 +92,11 @@ class SessionSource: parent_chat_id: Optional[str] = None # Parent channel when chat_id refers to a thread message_id: Optional[str] = None # ID of the triggering message (for pin/reply/react) role_authorized: bool = False # True when adapter granted access via role (not user ID) + # Profile this inbound message is routed to in a multiplexing gateway + # (from the /p// URL prefix or per-credential adapter ownership). + # None => the gateway's active/default profile. Drives both session-key + # namespacing and the per-turn config/credential scope. + profile: Optional[str] = None @property def description(self) -> str: @@ -135,6 +140,8 @@ class SessionSource: d["parent_chat_id"] = self.parent_chat_id if self.message_id: d["message_id"] = self.message_id + if self.profile: + d["profile"] = self.profile return d @classmethod @@ -153,6 +160,7 @@ class SessionSource: guild_id=data.get("guild_id"), parent_chat_id=data.get("parent_chat_id"), message_id=data.get("message_id"), + profile=data.get("profile"), ) @@ -802,18 +810,19 @@ class SessionStore: logger.debug("Could not remove temp file %s: %s", tmp_path, e) raise - def _resolve_profile_for_key(self) -> Optional[str]: + def _resolve_profile_for_key(self, source: Optional[SessionSource] = None) -> Optional[str]: """Return the profile namespace for session keys, or None when off. - Phase 0: when ``multiplex_profiles`` is disabled (default), returns - ``None`` so keys stay in the legacy ``agent:main`` namespace — - byte-identical to before. When enabled, returns the active profile name - so this store's keys are namespaced to it. Per-source profile - attribution (one store serving many profiles) arrives in a later phase; - until then the active profile is the correct namespace. + When ``multiplex_profiles`` is disabled (default), returns ``None`` so + keys stay in the legacy ``agent:main`` namespace — byte-identical to + before. When enabled, prefers the profile the inbound source was routed + to (``source.profile`` — set by the /p// URL prefix or + per-credential adapter), falling back to the active profile name. """ if not getattr(self.config, "multiplex_profiles", False): return None + if source is not None and source.profile: + return source.profile try: from hermes_cli.profiles import get_active_profile_name return get_active_profile_name() or "default" @@ -826,7 +835,7 @@ class SessionStore: source, group_sessions_per_user=getattr(self.config, "group_sessions_per_user", True), thread_sessions_per_user=getattr(self.config, "thread_sessions_per_user", False), - profile=self._resolve_profile_for_key(), + profile=self._resolve_profile_for_key(source), ) def _is_session_expired(self, entry: SessionEntry) -> bool: diff --git a/tests/gateway/test_multiplex_http_routing.py b/tests/gateway/test_multiplex_http_routing.py new file mode 100644 index 00000000000..e144030c351 --- /dev/null +++ b/tests/gateway/test_multiplex_http_routing.py @@ -0,0 +1,73 @@ +"""Phase 1: HTTP-inbound /p// routing for the webhook adapter.""" +import pytest + +from gateway.config import GatewayConfig, Platform +from gateway.session import SessionSource, build_session_key + + +class TestSessionSourceProfileField: + def test_profile_roundtrips(self): + s = SessionSource( + platform=Platform.WEBHOOK if hasattr(Platform, "WEBHOOK") else Platform.TELEGRAM, + chat_id="c1", + chat_type="webhook", + profile="coder", + ) + restored = SessionSource.from_dict(s.to_dict()) + assert restored.profile == "coder" + + def test_profile_absent_not_serialized(self): + s = SessionSource(platform=Platform.TELEGRAM, chat_id="c1", chat_type="dm") + assert "profile" not in s.to_dict() + + def test_source_profile_drives_session_key_namespace(self): + s = SessionSource(platform=Platform.TELEGRAM, chat_id="99", chat_type="dm") + # build_session_key takes profile explicitly; the adapter passes + # source.profile through. Verify the namespace follows it. + assert build_session_key(s, profile="coder") == "agent:coder:telegram:dm:99" + + +class TestWebhookProfileResolution: + """_resolve_request_profile validates the /p// prefix.""" + + def _adapter(self, multiplex: bool, served=("default", "coder")): + from gateway.platforms.webhook import WebhookAdapter, _PROFILE_REJECTED + + class _FakeReq: + def __init__(self, profile): + self.match_info = {"profile": profile} if profile is not None else {} + + cfg = GatewayConfig(multiplex_profiles=multiplex) + + class _Runner: + config = cfg + + # Construct minimally; we only call _resolve_request_profile. + adapter = WebhookAdapter.__new__(WebhookAdapter) + adapter.gateway_runner = _Runner() + return adapter, _FakeReq, _PROFILE_REJECTED, served + + def test_no_prefix_returns_none(self): + adapter, Req, _REJ, _ = self._adapter(multiplex=True) + assert adapter._resolve_request_profile(Req(None)) is None + + def test_prefix_ignored_when_multiplex_off(self): + adapter, Req, _REJ, _ = self._adapter(multiplex=False) + # Even a bogus profile is ignored (not 404'd) when multiplexing is off. + assert adapter._resolve_request_profile(Req("anything")) is None + + def test_known_profile_accepted(self, monkeypatch): + adapter, Req, _REJ, served = self._adapter(multiplex=True) + monkeypatch.setattr( + "hermes_cli.profiles.profiles_to_serve", + lambda multiplex: [(n, None) for n in served], + ) + assert adapter._resolve_request_profile(Req("coder")) == "coder" + + def test_unknown_profile_rejected(self, monkeypatch): + adapter, Req, REJ, served = self._adapter(multiplex=True) + monkeypatch.setattr( + "hermes_cli.profiles.profiles_to_serve", + lambda multiplex: [(n, None) for n in served], + ) + assert adapter._resolve_request_profile(Req("ghost")) is REJ