feat(gateway): multiplex phase 3 — secondary-profile adapter registry + conflict detection

Bring up adapters for every profile the gateway serves, not just the active
one. Keeps self.adapters as the default/active profile's map (the ~93 existing
self.adapters[...] sites are untouched) and adds secondary profiles under
self._profile_adapters[profile][platform].

- _start_secondary_profile_adapters loops profiles_to_serve(multiplex=True),
  skips the active profile (handled by the primary startup loop), and for each
  other profile loads its gateway config and creates+connects its enabled
  adapters under that profile's _profile_runtime_scope (home + secret scope).
- Each secondary adapter gets _make_profile_message_handler(profile): stamps
  source.profile (when unset) before delegating to the shared _handle_message,
  so the agent turn and session key resolve to that profile.
- Same-platform credential-conflict detection: _adapter_credential_fingerprint
  hashes the adapter's bot token (salted, truncated — never logs the token);
  two profiles claiming the same (platform, token) refuse the duplicate with a
  clear error naming both, since one token can't be polled twice.
- Port-binding hard-error: a SECONDARY profile that enables a port-binding
  platform (webhook, api_server, msgraph_webhook, feishu, wecom_callback,
  bluebubbles, sms) is a config error and aborts startup via MultiplexConfigError
  — the default profile owns the single shared HTTP listener and serves every
  profile through the /p/<profile>/ prefix, so a second bind can only collide.
  Distinct from a transient connect failure (which logs + stays alive to retry):
  a config error writes gateway_state=startup_failed and exits cleanly with an
  actionable message (names the profile, the platform, and the fix). There is no
  valid reason to bind a second port once you've opted into a multiplexer.
- Shutdown tears down secondary adapters alongside the primary ones.
- Defensive getattr guards keep partial-construction unit tests (stop(),
  _run_agent on bare instances) working.

No-op when multiplex_profiles is off (self._profile_adapters stays empty).

Tests: fingerprint stability/log-safety/distinctness, profile message-handler
stamping (and not overriding an already-stamped source), port-binding hard-error
raises + names the profile/platform, non-binding platform is not rejected, and
the guard set covers every TCP-binding adapter.
This commit is contained in:
Ben Barclay 2026-06-18 16:05:22 +10:00 committed by Teknium
parent f35abb122a
commit d5d02eabb0
2 changed files with 370 additions and 2 deletions

View file

@ -1239,6 +1239,33 @@ def _current_max_iterations() -> int:
from contextlib import contextmanager as _contextmanager
# Platforms that bind a host TCP port (HTTP/webhook listeners). In a profile
# multiplexer the default profile owns the single shared listener and serves
# every profile through the /p/<profile>/ URL prefix, so a SECONDARY profile
# enabling one of these is always a misconfiguration: it would try to bind a
# port already held by the default's listener. We hard-error on it rather than
# silently dropping the adapter (see _start_one_profile_adapters).
# Stored as platform .value strings since the Platform enum is imported below.
_PORT_BINDING_PLATFORM_VALUES = frozenset({
"webhook",
"api_server",
"msgraph_webhook",
"feishu",
"wecom_callback",
"bluebubbles",
"sms",
})
class MultiplexConfigError(RuntimeError):
"""A profile multiplexer config is invalid (fail-fast at startup).
Distinct from a transient adapter-connect failure: a transient error is
logged and the gateway stays alive to retry, but a config error means the
operator must fix config.yaml, so it aborts startup cleanly.
"""
@_contextmanager
def _profile_runtime_scope(profile_home: "Path"):
"""Scope config/skills/memory AND credentials to a profile for one turn.
@ -2328,6 +2355,12 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
except Exception:
logger.debug("could not set multiplex-active flag", exc_info=True)
self.adapters: Dict[Platform, BasePlatformAdapter] = {}
# Multi-profile multiplexing: adapters for NON-default profiles live
# here, keyed by profile name then Platform. self.adapters stays the
# default/active profile's map so the ~93 existing self.adapters[...]
# sites are untouched when multiplexing is off (this dict is empty).
# Populated by _start_secondary_profile_adapters().
self._profile_adapters: Dict[str, Dict[Platform, BasePlatformAdapter]] = {}
self._warn_if_docker_media_delivery_is_risky()
_gateway_runner_ref = _weakref.ref(self)
@ -5436,7 +5469,30 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
"attempts": 1,
"next_retry": time.monotonic() + 30,
}
# Multi-profile multiplexing: bring up adapters for every OTHER profile
# this gateway serves. Each profile's adapters connect under that
# profile's home + credential scope and stamp their inbound events with
# the profile so the agent turn resolves correctly. No-op when off.
try:
_secondary_connected = await self._start_secondary_profile_adapters()
connected_count += _secondary_connected
except MultiplexConfigError as e:
# Invalid multiplexer config — abort startup cleanly so the operator
# fixes config.yaml rather than running a half-wired gateway.
reason = str(e)
logger.error("Gateway multiplexer config error: %s", reason)
try:
from gateway.status import write_runtime_status
write_runtime_status(gateway_state="startup_failed", exit_reason=reason)
except Exception:
pass
self._request_clean_exit(reason)
self._startup_restore_in_progress = False
return True
except Exception as e:
logger.error("Secondary-profile adapter startup failed: %s", e, exc_info=True)
if connected_count == 0:
if startup_nonretryable_errors:
reason = "; ".join(startup_nonretryable_errors)
@ -6443,6 +6499,22 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
time.monotonic() - _adapter_started_at,
e,
)
# Disconnect secondary-profile adapters (multiplex mode).
for _prof, _amap in list(getattr(self, "_profile_adapters", {}).items()):
for platform, adapter in list(_amap.items()):
try:
await adapter.cancel_background_tasks()
except Exception as e:
logger.debug("%s bg-cancel error (profile %s): %s", platform.value, _prof, e)
try:
await adapter.disconnect()
logger.info("%s disconnected (profile: %s)", platform.value, _prof)
except Exception as e:
logger.error("%s disconnect error (profile %s): %s", platform.value, _prof, e)
_amap.clear()
if hasattr(self, "_profile_adapters"):
self._profile_adapters.clear()
logger.info(
"Shutdown phase: all adapters disconnected at +%.2fs",
_phase_elapsed(),
@ -6612,6 +6684,166 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
"""Wait for shutdown signal."""
await self._shutdown_event.wait()
async def _start_secondary_profile_adapters(self) -> int:
"""Bring up adapters for every non-active profile this gateway serves.
Returns the number of secondary adapters that connected. No-op (returns
0) unless ``gateway.multiplex_profiles`` is on.
Each profile's adapters are created and connected under that profile's
HERMES_HOME + secret scope (``_profile_runtime_scope``), stored in
``self._profile_adapters[profile]``, and given a message handler that
stamps ``source.profile`` before delegating to the shared
``_handle_message`` so the agent turn resolves that profile's config,
skills, and credentials. Same-platform credential collisions (two
profiles polling the same bot token) are detected and refused here, the
only point that sees every profile's resolved credentials together.
"""
if not getattr(self.config, "multiplex_profiles", False):
return 0
try:
from hermes_cli.profiles import profiles_to_serve, get_active_profile_name
except Exception:
return 0
active = get_active_profile_name() or "default"
connected = 0
# (platform, token-fingerprint) -> profile that claimed it. Detects two
# profiles trying to poll the same bot credential (impossible to do
# concurrently). Seed with the active profile's adapters.
claimed: Dict[tuple, str] = {}
for _plat, _ad in self.adapters.items():
fp = self._adapter_credential_fingerprint(_ad)
if fp is not None:
claimed[(_plat, fp)] = active
for profile_name, profile_home in profiles_to_serve(multiplex=True):
if profile_name == active:
continue # handled by the primary startup loop
try:
connected += await self._start_one_profile_adapters(
profile_name, profile_home, claimed
)
except MultiplexConfigError:
# Config error (e.g. a secondary profile binding a port) is not
# transient — propagate so startup aborts cleanly instead of
# limping along with a half-configured multiplexer.
raise
except Exception as e:
logger.error(
"Failed to start adapters for profile '%s': %s",
profile_name, e, exc_info=True,
)
return connected
async def _start_one_profile_adapters(
self, profile_name: str, profile_home: "Path", claimed: Dict[tuple, str]
) -> int:
"""Create+connect one profile's adapters under its runtime scope."""
from gateway.config import load_gateway_config
with _profile_runtime_scope(profile_home):
profile_cfg = load_gateway_config()
profile_map = self._profile_adapters.setdefault(profile_name, {})
connected = 0
for platform, platform_config in profile_cfg.platforms.items():
if not platform_config.enabled:
continue
# A secondary profile must NOT enable a port-binding platform: the
# default profile's listener already serves every profile via the
# /p/<profile>/ prefix, so a second bind can only collide. This is a
# config error, not a transient failure — fail fast and loud.
if platform.value in _PORT_BINDING_PLATFORM_VALUES:
raise MultiplexConfigError(
f"Profile '{profile_name}' enables the port-binding platform "
f"'{platform.value}', but gateway.multiplex_profiles is on. The "
f"default profile owns the single shared HTTP listener and "
f"serves every profile through the /p/{profile_name}/ URL "
f"prefix — a secondary profile cannot bind its own port. "
f"Remove platforms.{platform.value} from profile "
f"'{profile_name}'s config.yaml (configure it only on the "
f"default profile)."
)
with _profile_runtime_scope(profile_home):
adapter = self._create_adapter(platform, platform_config)
if not adapter:
continue
# Same-token conflict detection — refuse a duplicate poll.
fp = self._adapter_credential_fingerprint(adapter)
if fp is not None:
owner = claimed.get((platform, fp))
if owner is not None:
logger.error(
"Profile '%s' and '%s' both configure %s with the same "
"credential — refusing to start the duplicate (a single "
"bot token cannot be polled twice). Give each profile its "
"own %s credential.",
owner, profile_name, platform.value, platform.value,
)
await self._safe_adapter_disconnect(adapter, platform)
continue
claimed[(platform, fp)] = profile_name
# Stamp every inbound event from this adapter with its profile so
# the agent turn (and session key) resolve to the right home.
adapter.set_message_handler(
self._make_profile_message_handler(profile_name)
)
adapter.set_fatal_error_handler(self._handle_adapter_fatal_error)
adapter.set_session_store(self.session_store)
adapter.set_busy_session_handler(self._handle_active_session_busy_message)
adapter.set_topic_recovery_fn(self._recover_telegram_topic_thread_id)
adapter._busy_text_mode = self._busy_text_mode
try:
with _profile_runtime_scope(profile_home):
success = await self._connect_adapter_with_timeout(adapter, platform)
if success:
profile_map[platform] = adapter
connected += 1
logger.info("%s connected (profile: %s)", platform.value, profile_name)
else:
logger.warning("%s failed to connect (profile: %s)", platform.value, profile_name)
await self._safe_adapter_disconnect(adapter, platform)
except Exception as e:
logger.error("%s error (profile: %s): %s", platform.value, profile_name, e)
await self._safe_adapter_disconnect(adapter, platform)
return connected
def _make_profile_message_handler(self, profile_name: str):
"""Return a message handler that stamps source.profile then delegates."""
async def _handler(event):
try:
if getattr(event, "source", None) is not None and not event.source.profile:
event.source.profile = profile_name
except Exception:
pass
return await self._handle_message(event)
return _handler
@staticmethod
def _adapter_credential_fingerprint(adapter: Any) -> Optional[str]:
"""Return a stable, log-safe fingerprint of an adapter's credential.
Used only to detect two profiles claiming the same bot token. Returns a
salted hash (never the token itself) of the adapter's primary
credential, or None when no credential is discoverable (in which case
we don't attempt conflict detection for it).
"""
token = None
for attr in ("token", "bot_token", "_token", "api_token", "_bot_token"):
val = getattr(adapter, attr, None)
if isinstance(val, str) and val.strip():
token = val.strip()
break
if not token:
return None
import hashlib
return hashlib.sha256(("hermes-mux:" + token).encode("utf-8")).hexdigest()[:16]
def _create_adapter(
self,
platform: Platform,
@ -13883,7 +14115,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
multiplexing is off this is a transparent pass-through zero behavior
change for single-profile gateways.
"""
if not getattr(self.config, "multiplex_profiles", False):
if not getattr(getattr(self, "config", None), "multiplex_profiles", False):
return await self._run_agent_inner(
message, context_prompt, history, source, session_id,
session_key=session_key, run_generation=run_generation,

View file

@ -0,0 +1,136 @@
"""Phase 3: secondary-profile adapter registry + same-token conflict detection."""
import pytest
from gateway.run import GatewayRunner
class _FakeAdapter:
def __init__(self, token=None):
self.token = token
class TestCredentialFingerprint:
def test_none_without_token(self):
assert GatewayRunner._adapter_credential_fingerprint(_FakeAdapter()) is None
def test_stable_and_log_safe(self):
a = _FakeAdapter(token="secret-bot-token")
fp1 = GatewayRunner._adapter_credential_fingerprint(a)
fp2 = GatewayRunner._adapter_credential_fingerprint(_FakeAdapter(token="secret-bot-token"))
assert fp1 == fp2 # stable
assert "secret-bot-token" not in (fp1 or "") # never the raw token
assert len(fp1) == 16
def test_distinct_tokens_distinct_fp(self):
a = GatewayRunner._adapter_credential_fingerprint(_FakeAdapter(token="tok-A"))
b = GatewayRunner._adapter_credential_fingerprint(_FakeAdapter(token="tok-B"))
assert a != b
def test_reads_alt_attrs(self):
class _AltAdapter:
def __init__(self):
self.bot_token = "alt-token"
assert GatewayRunner._adapter_credential_fingerprint(_AltAdapter()) is not None
class TestProfileMessageHandler:
@pytest.mark.asyncio
async def test_stamps_profile_on_unstamped_source(self):
runner = GatewayRunner.__new__(GatewayRunner)
seen = {}
async def _fake_handle(event):
seen["profile"] = event.source.profile
return "ok"
runner._handle_message = _fake_handle
handler = runner._make_profile_message_handler("coder")
class _Src:
profile = None
class _Evt:
source = _Src()
result = await handler(_Evt())
assert result == "ok"
assert seen["profile"] == "coder"
@pytest.mark.asyncio
async def test_does_not_override_existing_profile(self):
runner = GatewayRunner.__new__(GatewayRunner)
seen = {}
async def _fake_handle(event):
seen["profile"] = event.source.profile
return "ok"
runner._handle_message = _fake_handle
handler = runner._make_profile_message_handler("coder")
class _Src:
profile = "writer" # already stamped (e.g. by URL prefix)
class _Evt:
source = _Src()
await handler(_Evt())
assert seen["profile"] == "writer"
class TestPortBindingHardError:
"""A secondary profile enabling a port-binding platform aborts startup."""
@pytest.mark.asyncio
async def test_secondary_webhook_raises(self, monkeypatch):
from gateway.run import MultiplexConfigError
from gateway.config import GatewayConfig, Platform, PlatformConfig
runner = GatewayRunner.__new__(GatewayRunner)
runner.config = GatewayConfig(multiplex_profiles=True)
runner._profile_adapters = {}
# reviewer profile config enables webhook (a port-binding platform)
reviewer_cfg = GatewayConfig(multiplex_profiles=True)
reviewer_cfg.platforms = {
Platform.WEBHOOK: PlatformConfig(enabled=True, extra={"port": 8644}),
}
monkeypatch.setattr(
"gateway.config.load_gateway_config", lambda: reviewer_cfg
)
with pytest.raises(MultiplexConfigError) as ei:
await runner._start_one_profile_adapters("reviewer", "/tmp/x", {})
assert "webhook" in str(ei.value)
assert "reviewer" in str(ei.value)
@pytest.mark.asyncio
async def test_secondary_non_binding_platform_ok(self, monkeypatch):
"""A non-port-binding platform (e.g. telegram) is NOT rejected."""
from gateway.config import GatewayConfig, Platform, PlatformConfig
runner = GatewayRunner.__new__(GatewayRunner)
runner.config = GatewayConfig(multiplex_profiles=True)
runner._profile_adapters = {}
reviewer_cfg = GatewayConfig(multiplex_profiles=True)
reviewer_cfg.platforms = {
Platform.TELEGRAM: PlatformConfig(enabled=True, token="t"),
}
monkeypatch.setattr(
"gateway.config.load_gateway_config", lambda: reviewer_cfg
)
# _create_adapter returns None here (no real telegram token wiring), so
# the loop simply connects nothing — the key assertion is NO raise.
monkeypatch.setattr(runner, "_create_adapter", lambda p, c: None)
connected = await runner._start_one_profile_adapters("reviewer", "/tmp/x", {})
assert connected == 0 # nothing connected, but no MultiplexConfigError
def test_port_binding_set_covers_known_listeners(self):
from gateway.run import _PORT_BINDING_PLATFORM_VALUES
# Every adapter that binds a TCP port must be in the guard set.
for p in ("webhook", "api_server", "msgraph_webhook", "feishu",
"wecom_callback", "bluebubbles", "sms"):
assert p in _PORT_BINDING_PLATFORM_VALUES