hermes-agent/tests/gateway/test_multiplex_adapter_registry.py
Ben Barclay d5d02eabb0 feat(gateway): multiplex phase 3 — secondary-profile adapter registry + conflict detection
Bring up adapters for every profile the gateway serves, not just the active
one. Keeps self.adapters as the default/active profile's map (the ~93 existing
self.adapters[...] sites are untouched) and adds secondary profiles under
self._profile_adapters[profile][platform].

- _start_secondary_profile_adapters loops profiles_to_serve(multiplex=True),
  skips the active profile (handled by the primary startup loop), and for each
  other profile loads its gateway config and creates+connects its enabled
  adapters under that profile's _profile_runtime_scope (home + secret scope).
- Each secondary adapter gets _make_profile_message_handler(profile): stamps
  source.profile (when unset) before delegating to the shared _handle_message,
  so the agent turn and session key resolve to that profile.
- Same-platform credential-conflict detection: _adapter_credential_fingerprint
  hashes the adapter's bot token (salted, truncated — never logs the token);
  two profiles claiming the same (platform, token) refuse the duplicate with a
  clear error naming both, since one token can't be polled twice.
- Port-binding hard-error: a SECONDARY profile that enables a port-binding
  platform (webhook, api_server, msgraph_webhook, feishu, wecom_callback,
  bluebubbles, sms) is a config error and aborts startup via MultiplexConfigError
  — the default profile owns the single shared HTTP listener and serves every
  profile through the /p/<profile>/ prefix, so a second bind can only collide.
  Distinct from a transient connect failure (which logs + stays alive to retry):
  a config error writes gateway_state=startup_failed and exits cleanly with an
  actionable message (names the profile, the platform, and the fix). There is no
  valid reason to bind a second port once you've opted into a multiplexer.
- Shutdown tears down secondary adapters alongside the primary ones.
- Defensive getattr guards keep partial-construction unit tests (stop(),
  _run_agent on bare instances) working.

No-op when multiplex_profiles is off (self._profile_adapters stays empty).

Tests: fingerprint stability/log-safety/distinctness, profile message-handler
stamping (and not overriding an already-stamped source), port-binding hard-error
raises + names the profile/platform, non-binding platform is not rejected, and
the guard set covers every TCP-binding adapter.
2026-06-19 07:34:15 -07:00

136 lines
4.9 KiB
Python

"""Phase 3: secondary-profile adapter registry + same-token conflict detection."""
import pytest
from gateway.run import GatewayRunner
class _FakeAdapter:
def __init__(self, token=None):
self.token = token
class TestCredentialFingerprint:
def test_none_without_token(self):
assert GatewayRunner._adapter_credential_fingerprint(_FakeAdapter()) is None
def test_stable_and_log_safe(self):
a = _FakeAdapter(token="secret-bot-token")
fp1 = GatewayRunner._adapter_credential_fingerprint(a)
fp2 = GatewayRunner._adapter_credential_fingerprint(_FakeAdapter(token="secret-bot-token"))
assert fp1 == fp2 # stable
assert "secret-bot-token" not in (fp1 or "") # never the raw token
assert len(fp1) == 16
def test_distinct_tokens_distinct_fp(self):
a = GatewayRunner._adapter_credential_fingerprint(_FakeAdapter(token="tok-A"))
b = GatewayRunner._adapter_credential_fingerprint(_FakeAdapter(token="tok-B"))
assert a != b
def test_reads_alt_attrs(self):
class _AltAdapter:
def __init__(self):
self.bot_token = "alt-token"
assert GatewayRunner._adapter_credential_fingerprint(_AltAdapter()) is not None
class TestProfileMessageHandler:
@pytest.mark.asyncio
async def test_stamps_profile_on_unstamped_source(self):
runner = GatewayRunner.__new__(GatewayRunner)
seen = {}
async def _fake_handle(event):
seen["profile"] = event.source.profile
return "ok"
runner._handle_message = _fake_handle
handler = runner._make_profile_message_handler("coder")
class _Src:
profile = None
class _Evt:
source = _Src()
result = await handler(_Evt())
assert result == "ok"
assert seen["profile"] == "coder"
@pytest.mark.asyncio
async def test_does_not_override_existing_profile(self):
runner = GatewayRunner.__new__(GatewayRunner)
seen = {}
async def _fake_handle(event):
seen["profile"] = event.source.profile
return "ok"
runner._handle_message = _fake_handle
handler = runner._make_profile_message_handler("coder")
class _Src:
profile = "writer" # already stamped (e.g. by URL prefix)
class _Evt:
source = _Src()
await handler(_Evt())
assert seen["profile"] == "writer"
class TestPortBindingHardError:
"""A secondary profile enabling a port-binding platform aborts startup."""
@pytest.mark.asyncio
async def test_secondary_webhook_raises(self, monkeypatch):
from gateway.run import MultiplexConfigError
from gateway.config import GatewayConfig, Platform, PlatformConfig
runner = GatewayRunner.__new__(GatewayRunner)
runner.config = GatewayConfig(multiplex_profiles=True)
runner._profile_adapters = {}
# reviewer profile config enables webhook (a port-binding platform)
reviewer_cfg = GatewayConfig(multiplex_profiles=True)
reviewer_cfg.platforms = {
Platform.WEBHOOK: PlatformConfig(enabled=True, extra={"port": 8644}),
}
monkeypatch.setattr(
"gateway.config.load_gateway_config", lambda: reviewer_cfg
)
with pytest.raises(MultiplexConfigError) as ei:
await runner._start_one_profile_adapters("reviewer", "/tmp/x", {})
assert "webhook" in str(ei.value)
assert "reviewer" in str(ei.value)
@pytest.mark.asyncio
async def test_secondary_non_binding_platform_ok(self, monkeypatch):
"""A non-port-binding platform (e.g. telegram) is NOT rejected."""
from gateway.config import GatewayConfig, Platform, PlatformConfig
runner = GatewayRunner.__new__(GatewayRunner)
runner.config = GatewayConfig(multiplex_profiles=True)
runner._profile_adapters = {}
reviewer_cfg = GatewayConfig(multiplex_profiles=True)
reviewer_cfg.platforms = {
Platform.TELEGRAM: PlatformConfig(enabled=True, token="t"),
}
monkeypatch.setattr(
"gateway.config.load_gateway_config", lambda: reviewer_cfg
)
# _create_adapter returns None here (no real telegram token wiring), so
# the loop simply connects nothing — the key assertion is NO raise.
monkeypatch.setattr(runner, "_create_adapter", lambda p, c: None)
connected = await runner._start_one_profile_adapters("reviewer", "/tmp/x", {})
assert connected == 0 # nothing connected, but no MultiplexConfigError
def test_port_binding_set_covers_known_listeners(self):
from gateway.run import _PORT_BINDING_PLATFORM_VALUES
# Every adapter that binds a TCP port must be in the guard set.
for p in ("webhook", "api_server", "msgraph_webhook", "feishu",
"wecom_callback", "bluebubbles", "sms"):
assert p in _PORT_BINDING_PLATFORM_VALUES