feat(gateway): multiplex phase 1 — HTTP-inbound /p/<profile>/ routing (webhook)

Serve webhook inbound for multiple profiles off the one shared listener via a
URL prefix, with no second port bound.

- SessionSource gains a 'profile' field (round-trips through to_dict/from_dict;
  omitted when unset so existing serialization is unchanged). It carries which
  profile an inbound message was routed to.
- WebhookAdapter registers /p/{profile}/webhooks/{route_name} alongside the
  existing /webhooks/{route_name}. _resolve_request_profile validates the
  prefix against profiles_to_serve(): None when absent or multiplexing is off
  (ignored, handled as default — no spurious 404), the profile name when valid,
  _PROFILE_REJECTED (→ 404) when the profile isn't served. The resolved profile
  is stamped onto the SessionSource.
- session-key namespacing and the per-turn home/credential scope now prefer
  source.profile: SessionStore._resolve_profile_for_key(source),
  _session_key_for_source fallback, and _resolve_profile_home_for_source all
  honor it (→ the agent turn resolves that profile's config/skills/credentials
  via the Phase 2 _profile_runtime_scope).

Constraint: routing inbound needs no per-profile platform credential, but the
agent still needs the routed profile's provider key — delivered by Phase 2's
secret scope. api_server (OpenAI-compatible surface) profile routing is a
focused follow-on; its source-construction path differs from webhook's.

Tests: SessionSource.profile round-trip + namespace drive; _resolve_request_
profile accept/reject/ignore matrix.
This commit is contained in:
Ben Barclay 2026-06-18 15:56:13 +10:00 committed by Teknium
parent f538470cf4
commit f35abb122a
4 changed files with 153 additions and 18 deletions

View file

@ -57,6 +57,11 @@ from gateway.platforms.base import (
logger = logging.getLogger(__name__)
# Sentinel returned by _resolve_request_profile when a /p/<profile>/ prefix
# names a profile this gateway does not serve (→ 404). Distinct from None
# (no prefix / multiplexing off → handle as the default profile).
_PROFILE_REJECTED = object()
_BUILTIN_DELIVER_PLATFORMS = {
"telegram", "discord", "slack", "signal", "sms", "whatsapp",
"matrix", "mattermost", "homeassistant", "email", "dingtalk",
@ -189,6 +194,14 @@ class WebhookAdapter(BasePlatformAdapter):
app = web.Application()
app.router.add_get("/health", self._handle_health)
app.router.add_post("/webhooks/{route_name}", self._handle_webhook)
# Multi-profile multiplexing: a /p/<profile>/webhooks/<route> prefix
# routes the inbound event to that profile. Same handler; the profile is
# captured from the path and stamped onto the SessionSource so the agent
# turn resolves that profile's config/skills/credentials. Only honored
# when gateway.multiplex_profiles is on (the handler validates).
app.router.add_post(
"/p/{profile}/webhooks/{route_name}", self._handle_webhook
)
# Port conflict detection — fail fast if port is already in use
import socket as _socket
@ -397,6 +410,35 @@ class WebhookAdapter(BasePlatformAdapter):
except Exception as e:
logger.error("[webhook] Failed to reload dynamic routes: %s", e)
def _resolve_request_profile(self, request: "web.Request"):
"""Resolve + validate the /p/<profile>/ URL prefix on a webhook request.
Returns:
- ``None`` when no profile prefix is present, or multiplexing is off
(the prefix is ignored, request handled as the default profile).
- the profile name (str) when present, multiplexing is on, and the
profile is one this gateway serves.
- ``_PROFILE_REJECTED`` when a prefix is present but the profile is
unknown/unconfigured (handler returns 404).
"""
profile = (request.match_info.get("profile") or "").strip()
if not profile:
return None
runner = self.gateway_runner
cfg = getattr(runner, "config", None)
if not getattr(cfg, "multiplex_profiles", False):
# Prefix supplied but multiplexing is off — ignore it, behave as
# the single-profile gateway (don't 404 a would-be valid route).
return None
try:
from hermes_cli.profiles import profiles_to_serve
served = {name for name, _ in profiles_to_serve(multiplex=True)}
except Exception:
return _PROFILE_REJECTED
if profile not in served:
return _PROFILE_REJECTED
return profile
async def _handle_webhook(self, request: "web.Request") -> "web.Response":
"""POST /webhooks/{route_name} — receive and process a webhook event."""
# Hot-reload dynamic subscriptions on each request (mtime-gated, cheap)
@ -405,6 +447,13 @@ class WebhookAdapter(BasePlatformAdapter):
route_name = request.match_info.get("route_name", "")
route_config = self._routes.get(route_name)
# Multi-profile: resolve + validate the /p/<profile>/ prefix if present.
profile = self._resolve_request_profile(request)
if profile is _PROFILE_REJECTED:
return web.json_response(
{"error": "Unknown or unconfigured profile"}, status=404
)
if not route_config:
return web.json_response(
{"error": f"Unknown route: {route_name}"}, status=404
@ -641,6 +690,8 @@ class WebhookAdapter(BasePlatformAdapter):
user_id=f"webhook:{route_name}",
user_name=route_name,
)
if profile and isinstance(profile, str):
source.profile = profile
event = MessageEvent(
text=prompt,
message_type=MessageType.TEXT,

View file

@ -2884,11 +2884,14 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
# agent:main) unless multiplexing is on, then the active profile.
_profile = None
if getattr(config, "multiplex_profiles", False):
try:
from hermes_cli.profiles import get_active_profile_name
_profile = get_active_profile_name() or "default"
except Exception:
_profile = None
if source.profile:
_profile = source.profile
else:
try:
from hermes_cli.profiles import get_active_profile_name
_profile = get_active_profile_name() or "default"
except Exception:
_profile = None
return build_session_key(
source,
group_sessions_per_user=getattr(config, "group_sessions_per_user", True),
@ -13902,14 +13905,13 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
def _resolve_profile_home_for_source(self, source: SessionSource) -> "Path":
"""Resolve which profile's HERMES_HOME should serve this inbound source.
Phase 2 baseline: the active profile (the multiplexer's own home). Phase
1/3 wire real per-source attribution (URL prefix, per-credential adapter
ownership) by overriding the resolved profile on the source/adapter; this
method is the single point they hook.
Prefers the profile the source was routed to (``source.profile`` set
by the /p/<profile>/ URL prefix or a per-credential adapter), falling
back to the active profile (the multiplexer's own home).
"""
from hermes_cli.profiles import get_active_profile_name, get_profile_dir
try:
name = get_active_profile_name() or "default"
name = (source.profile or "").strip() or get_active_profile_name() or "default"
return get_profile_dir(name)
except Exception:
from hermes_constants import get_hermes_home

View file

@ -92,6 +92,11 @@ class SessionSource:
parent_chat_id: Optional[str] = None # Parent channel when chat_id refers to a thread
message_id: Optional[str] = None # ID of the triggering message (for pin/reply/react)
role_authorized: bool = False # True when adapter granted access via role (not user ID)
# Profile this inbound message is routed to in a multiplexing gateway
# (from the /p/<profile>/ URL prefix or per-credential adapter ownership).
# None => the gateway's active/default profile. Drives both session-key
# namespacing and the per-turn config/credential scope.
profile: Optional[str] = None
@property
def description(self) -> str:
@ -135,6 +140,8 @@ class SessionSource:
d["parent_chat_id"] = self.parent_chat_id
if self.message_id:
d["message_id"] = self.message_id
if self.profile:
d["profile"] = self.profile
return d
@classmethod
@ -153,6 +160,7 @@ class SessionSource:
guild_id=data.get("guild_id"),
parent_chat_id=data.get("parent_chat_id"),
message_id=data.get("message_id"),
profile=data.get("profile"),
)
@ -802,18 +810,19 @@ class SessionStore:
logger.debug("Could not remove temp file %s: %s", tmp_path, e)
raise
def _resolve_profile_for_key(self) -> Optional[str]:
def _resolve_profile_for_key(self, source: Optional[SessionSource] = None) -> Optional[str]:
"""Return the profile namespace for session keys, or None when off.
Phase 0: when ``multiplex_profiles`` is disabled (default), returns
``None`` so keys stay in the legacy ``agent:main`` namespace
byte-identical to before. When enabled, returns the active profile name
so this store's keys are namespaced to it. Per-source profile
attribution (one store serving many profiles) arrives in a later phase;
until then the active profile is the correct namespace.
When ``multiplex_profiles`` is disabled (default), returns ``None`` so
keys stay in the legacy ``agent:main`` namespace byte-identical to
before. When enabled, prefers the profile the inbound source was routed
to (``source.profile`` set by the /p/<profile>/ URL prefix or
per-credential adapter), falling back to the active profile name.
"""
if not getattr(self.config, "multiplex_profiles", False):
return None
if source is not None and source.profile:
return source.profile
try:
from hermes_cli.profiles import get_active_profile_name
return get_active_profile_name() or "default"
@ -826,7 +835,7 @@ class SessionStore:
source,
group_sessions_per_user=getattr(self.config, "group_sessions_per_user", True),
thread_sessions_per_user=getattr(self.config, "thread_sessions_per_user", False),
profile=self._resolve_profile_for_key(),
profile=self._resolve_profile_for_key(source),
)
def _is_session_expired(self, entry: SessionEntry) -> bool:

View file

@ -0,0 +1,73 @@
"""Phase 1: HTTP-inbound /p/<profile>/ routing for the webhook adapter."""
import pytest
from gateway.config import GatewayConfig, Platform
from gateway.session import SessionSource, build_session_key
class TestSessionSourceProfileField:
def test_profile_roundtrips(self):
s = SessionSource(
platform=Platform.WEBHOOK if hasattr(Platform, "WEBHOOK") else Platform.TELEGRAM,
chat_id="c1",
chat_type="webhook",
profile="coder",
)
restored = SessionSource.from_dict(s.to_dict())
assert restored.profile == "coder"
def test_profile_absent_not_serialized(self):
s = SessionSource(platform=Platform.TELEGRAM, chat_id="c1", chat_type="dm")
assert "profile" not in s.to_dict()
def test_source_profile_drives_session_key_namespace(self):
s = SessionSource(platform=Platform.TELEGRAM, chat_id="99", chat_type="dm")
# build_session_key takes profile explicitly; the adapter passes
# source.profile through. Verify the namespace follows it.
assert build_session_key(s, profile="coder") == "agent:coder:telegram:dm:99"
class TestWebhookProfileResolution:
"""_resolve_request_profile validates the /p/<profile>/ prefix."""
def _adapter(self, multiplex: bool, served=("default", "coder")):
from gateway.platforms.webhook import WebhookAdapter, _PROFILE_REJECTED
class _FakeReq:
def __init__(self, profile):
self.match_info = {"profile": profile} if profile is not None else {}
cfg = GatewayConfig(multiplex_profiles=multiplex)
class _Runner:
config = cfg
# Construct minimally; we only call _resolve_request_profile.
adapter = WebhookAdapter.__new__(WebhookAdapter)
adapter.gateway_runner = _Runner()
return adapter, _FakeReq, _PROFILE_REJECTED, served
def test_no_prefix_returns_none(self):
adapter, Req, _REJ, _ = self._adapter(multiplex=True)
assert adapter._resolve_request_profile(Req(None)) is None
def test_prefix_ignored_when_multiplex_off(self):
adapter, Req, _REJ, _ = self._adapter(multiplex=False)
# Even a bogus profile is ignored (not 404'd) when multiplexing is off.
assert adapter._resolve_request_profile(Req("anything")) is None
def test_known_profile_accepted(self, monkeypatch):
adapter, Req, _REJ, served = self._adapter(multiplex=True)
monkeypatch.setattr(
"hermes_cli.profiles.profiles_to_serve",
lambda multiplex: [(n, None) for n in served],
)
assert adapter._resolve_request_profile(Req("coder")) == "coder"
def test_unknown_profile_rejected(self, monkeypatch):
adapter, Req, REJ, served = self._adapter(multiplex=True)
monkeypatch.setattr(
"hermes_cli.profiles.profiles_to_serve",
lambda multiplex: [(n, None) for n in served],
)
assert adapter._resolve_request_profile(Req("ghost")) is REJ