diff --git a/gateway/run.py b/gateway/run.py index 4d3e22c412c..2d42dfd2d9f 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1239,6 +1239,33 @@ def _current_max_iterations() -> int: from contextlib import contextmanager as _contextmanager +# Platforms that bind a host TCP port (HTTP/webhook listeners). In a profile +# multiplexer the default profile owns the single shared listener and serves +# every profile through the /p// URL prefix, so a SECONDARY profile +# enabling one of these is always a misconfiguration: it would try to bind a +# port already held by the default's listener. We hard-error on it rather than +# silently dropping the adapter (see _start_one_profile_adapters). +# Stored as platform .value strings since the Platform enum is imported below. +_PORT_BINDING_PLATFORM_VALUES = frozenset({ + "webhook", + "api_server", + "msgraph_webhook", + "feishu", + "wecom_callback", + "bluebubbles", + "sms", +}) + + +class MultiplexConfigError(RuntimeError): + """A profile multiplexer config is invalid (fail-fast at startup). + + Distinct from a transient adapter-connect failure: a transient error is + logged and the gateway stays alive to retry, but a config error means the + operator must fix config.yaml, so it aborts startup cleanly. + """ + + @_contextmanager def _profile_runtime_scope(profile_home: "Path"): """Scope config/skills/memory AND credentials to a profile for one turn. @@ -2328,6 +2355,12 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew except Exception: logger.debug("could not set multiplex-active flag", exc_info=True) self.adapters: Dict[Platform, BasePlatformAdapter] = {} + # Multi-profile multiplexing: adapters for NON-default profiles live + # here, keyed by profile name then Platform. self.adapters stays the + # default/active profile's map so the ~93 existing self.adapters[...] + # sites are untouched when multiplexing is off (this dict is empty). + # Populated by _start_secondary_profile_adapters(). + self._profile_adapters: Dict[str, Dict[Platform, BasePlatformAdapter]] = {} self._warn_if_docker_media_delivery_is_risky() _gateway_runner_ref = _weakref.ref(self) @@ -5436,7 +5469,30 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew "attempts": 1, "next_retry": time.monotonic() + 30, } - + + # Multi-profile multiplexing: bring up adapters for every OTHER profile + # this gateway serves. Each profile's adapters connect under that + # profile's home + credential scope and stamp their inbound events with + # the profile so the agent turn resolves correctly. No-op when off. + try: + _secondary_connected = await self._start_secondary_profile_adapters() + connected_count += _secondary_connected + except MultiplexConfigError as e: + # Invalid multiplexer config — abort startup cleanly so the operator + # fixes config.yaml rather than running a half-wired gateway. + reason = str(e) + logger.error("Gateway multiplexer config error: %s", reason) + try: + from gateway.status import write_runtime_status + write_runtime_status(gateway_state="startup_failed", exit_reason=reason) + except Exception: + pass + self._request_clean_exit(reason) + self._startup_restore_in_progress = False + return True + except Exception as e: + logger.error("Secondary-profile adapter startup failed: %s", e, exc_info=True) + if connected_count == 0: if startup_nonretryable_errors: reason = "; ".join(startup_nonretryable_errors) @@ -6443,6 +6499,22 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew time.monotonic() - _adapter_started_at, e, ) + + # Disconnect secondary-profile adapters (multiplex mode). + for _prof, _amap in list(getattr(self, "_profile_adapters", {}).items()): + for platform, adapter in list(_amap.items()): + try: + await adapter.cancel_background_tasks() + except Exception as e: + logger.debug("✗ %s bg-cancel error (profile %s): %s", platform.value, _prof, e) + try: + await adapter.disconnect() + logger.info("✓ %s disconnected (profile: %s)", platform.value, _prof) + except Exception as e: + logger.error("✗ %s disconnect error (profile %s): %s", platform.value, _prof, e) + _amap.clear() + if hasattr(self, "_profile_adapters"): + self._profile_adapters.clear() logger.info( "Shutdown phase: all adapters disconnected at +%.2fs", _phase_elapsed(), @@ -6612,6 +6684,166 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew """Wait for shutdown signal.""" await self._shutdown_event.wait() + async def _start_secondary_profile_adapters(self) -> int: + """Bring up adapters for every non-active profile this gateway serves. + + Returns the number of secondary adapters that connected. No-op (returns + 0) unless ``gateway.multiplex_profiles`` is on. + + Each profile's adapters are created and connected under that profile's + HERMES_HOME + secret scope (``_profile_runtime_scope``), stored in + ``self._profile_adapters[profile]``, and given a message handler that + stamps ``source.profile`` before delegating to the shared + ``_handle_message`` — so the agent turn resolves that profile's config, + skills, and credentials. Same-platform credential collisions (two + profiles polling the same bot token) are detected and refused here, the + only point that sees every profile's resolved credentials together. + """ + if not getattr(self.config, "multiplex_profiles", False): + return 0 + + try: + from hermes_cli.profiles import profiles_to_serve, get_active_profile_name + except Exception: + return 0 + + active = get_active_profile_name() or "default" + connected = 0 + # (platform, token-fingerprint) -> profile that claimed it. Detects two + # profiles trying to poll the same bot credential (impossible to do + # concurrently). Seed with the active profile's adapters. + claimed: Dict[tuple, str] = {} + for _plat, _ad in self.adapters.items(): + fp = self._adapter_credential_fingerprint(_ad) + if fp is not None: + claimed[(_plat, fp)] = active + + for profile_name, profile_home in profiles_to_serve(multiplex=True): + if profile_name == active: + continue # handled by the primary startup loop + try: + connected += await self._start_one_profile_adapters( + profile_name, profile_home, claimed + ) + except MultiplexConfigError: + # Config error (e.g. a secondary profile binding a port) is not + # transient — propagate so startup aborts cleanly instead of + # limping along with a half-configured multiplexer. + raise + except Exception as e: + logger.error( + "Failed to start adapters for profile '%s': %s", + profile_name, e, exc_info=True, + ) + return connected + + async def _start_one_profile_adapters( + self, profile_name: str, profile_home: "Path", claimed: Dict[tuple, str] + ) -> int: + """Create+connect one profile's adapters under its runtime scope.""" + from gateway.config import load_gateway_config + + with _profile_runtime_scope(profile_home): + profile_cfg = load_gateway_config() + + profile_map = self._profile_adapters.setdefault(profile_name, {}) + connected = 0 + for platform, platform_config in profile_cfg.platforms.items(): + if not platform_config.enabled: + continue + # A secondary profile must NOT enable a port-binding platform: the + # default profile's listener already serves every profile via the + # /p// prefix, so a second bind can only collide. This is a + # config error, not a transient failure — fail fast and loud. + if platform.value in _PORT_BINDING_PLATFORM_VALUES: + raise MultiplexConfigError( + f"Profile '{profile_name}' enables the port-binding platform " + f"'{platform.value}', but gateway.multiplex_profiles is on. The " + f"default profile owns the single shared HTTP listener and " + f"serves every profile through the /p/{profile_name}/ URL " + f"prefix — a secondary profile cannot bind its own port. " + f"Remove platforms.{platform.value} from profile " + f"'{profile_name}'s config.yaml (configure it only on the " + f"default profile)." + ) + with _profile_runtime_scope(profile_home): + adapter = self._create_adapter(platform, platform_config) + if not adapter: + continue + + # Same-token conflict detection — refuse a duplicate poll. + fp = self._adapter_credential_fingerprint(adapter) + if fp is not None: + owner = claimed.get((platform, fp)) + if owner is not None: + logger.error( + "Profile '%s' and '%s' both configure %s with the same " + "credential — refusing to start the duplicate (a single " + "bot token cannot be polled twice). Give each profile its " + "own %s credential.", + owner, profile_name, platform.value, platform.value, + ) + await self._safe_adapter_disconnect(adapter, platform) + continue + claimed[(platform, fp)] = profile_name + + # Stamp every inbound event from this adapter with its profile so + # the agent turn (and session key) resolve to the right home. + adapter.set_message_handler( + self._make_profile_message_handler(profile_name) + ) + adapter.set_fatal_error_handler(self._handle_adapter_fatal_error) + adapter.set_session_store(self.session_store) + adapter.set_busy_session_handler(self._handle_active_session_busy_message) + adapter.set_topic_recovery_fn(self._recover_telegram_topic_thread_id) + adapter._busy_text_mode = self._busy_text_mode + + try: + with _profile_runtime_scope(profile_home): + success = await self._connect_adapter_with_timeout(adapter, platform) + if success: + profile_map[platform] = adapter + connected += 1 + logger.info("✓ %s connected (profile: %s)", platform.value, profile_name) + else: + logger.warning("✗ %s failed to connect (profile: %s)", platform.value, profile_name) + await self._safe_adapter_disconnect(adapter, platform) + except Exception as e: + logger.error("✗ %s error (profile: %s): %s", platform.value, profile_name, e) + await self._safe_adapter_disconnect(adapter, platform) + return connected + + def _make_profile_message_handler(self, profile_name: str): + """Return a message handler that stamps source.profile then delegates.""" + async def _handler(event): + try: + if getattr(event, "source", None) is not None and not event.source.profile: + event.source.profile = profile_name + except Exception: + pass + return await self._handle_message(event) + return _handler + + @staticmethod + def _adapter_credential_fingerprint(adapter: Any) -> Optional[str]: + """Return a stable, log-safe fingerprint of an adapter's credential. + + Used only to detect two profiles claiming the same bot token. Returns a + salted hash (never the token itself) of the adapter's primary + credential, or None when no credential is discoverable (in which case + we don't attempt conflict detection for it). + """ + token = None + for attr in ("token", "bot_token", "_token", "api_token", "_bot_token"): + val = getattr(adapter, attr, None) + if isinstance(val, str) and val.strip(): + token = val.strip() + break + if not token: + return None + import hashlib + return hashlib.sha256(("hermes-mux:" + token).encode("utf-8")).hexdigest()[:16] + def _create_adapter( self, platform: Platform, @@ -13883,7 +14115,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew multiplexing is off this is a transparent pass-through — zero behavior change for single-profile gateways. """ - if not getattr(self.config, "multiplex_profiles", False): + if not getattr(getattr(self, "config", None), "multiplex_profiles", False): return await self._run_agent_inner( message, context_prompt, history, source, session_id, session_key=session_key, run_generation=run_generation, diff --git a/tests/gateway/test_multiplex_adapter_registry.py b/tests/gateway/test_multiplex_adapter_registry.py new file mode 100644 index 00000000000..7ecca64dfee --- /dev/null +++ b/tests/gateway/test_multiplex_adapter_registry.py @@ -0,0 +1,136 @@ +"""Phase 3: secondary-profile adapter registry + same-token conflict detection.""" +import pytest + +from gateway.run import GatewayRunner + + +class _FakeAdapter: + def __init__(self, token=None): + self.token = token + + +class TestCredentialFingerprint: + def test_none_without_token(self): + assert GatewayRunner._adapter_credential_fingerprint(_FakeAdapter()) is None + + def test_stable_and_log_safe(self): + a = _FakeAdapter(token="secret-bot-token") + fp1 = GatewayRunner._adapter_credential_fingerprint(a) + fp2 = GatewayRunner._adapter_credential_fingerprint(_FakeAdapter(token="secret-bot-token")) + assert fp1 == fp2 # stable + assert "secret-bot-token" not in (fp1 or "") # never the raw token + assert len(fp1) == 16 + + def test_distinct_tokens_distinct_fp(self): + a = GatewayRunner._adapter_credential_fingerprint(_FakeAdapter(token="tok-A")) + b = GatewayRunner._adapter_credential_fingerprint(_FakeAdapter(token="tok-B")) + assert a != b + + def test_reads_alt_attrs(self): + class _AltAdapter: + def __init__(self): + self.bot_token = "alt-token" + assert GatewayRunner._adapter_credential_fingerprint(_AltAdapter()) is not None + + +class TestProfileMessageHandler: + @pytest.mark.asyncio + async def test_stamps_profile_on_unstamped_source(self): + runner = GatewayRunner.__new__(GatewayRunner) + seen = {} + + async def _fake_handle(event): + seen["profile"] = event.source.profile + return "ok" + + runner._handle_message = _fake_handle + handler = runner._make_profile_message_handler("coder") + + class _Src: + profile = None + + class _Evt: + source = _Src() + + result = await handler(_Evt()) + assert result == "ok" + assert seen["profile"] == "coder" + + @pytest.mark.asyncio + async def test_does_not_override_existing_profile(self): + runner = GatewayRunner.__new__(GatewayRunner) + seen = {} + + async def _fake_handle(event): + seen["profile"] = event.source.profile + return "ok" + + runner._handle_message = _fake_handle + handler = runner._make_profile_message_handler("coder") + + class _Src: + profile = "writer" # already stamped (e.g. by URL prefix) + + class _Evt: + source = _Src() + + await handler(_Evt()) + assert seen["profile"] == "writer" + + +class TestPortBindingHardError: + """A secondary profile enabling a port-binding platform aborts startup.""" + + @pytest.mark.asyncio + async def test_secondary_webhook_raises(self, monkeypatch): + from gateway.run import MultiplexConfigError + from gateway.config import GatewayConfig, Platform, PlatformConfig + + runner = GatewayRunner.__new__(GatewayRunner) + runner.config = GatewayConfig(multiplex_profiles=True) + runner._profile_adapters = {} + + # reviewer profile config enables webhook (a port-binding platform) + reviewer_cfg = GatewayConfig(multiplex_profiles=True) + reviewer_cfg.platforms = { + Platform.WEBHOOK: PlatformConfig(enabled=True, extra={"port": 8644}), + } + monkeypatch.setattr( + "gateway.config.load_gateway_config", lambda: reviewer_cfg + ) + + with pytest.raises(MultiplexConfigError) as ei: + await runner._start_one_profile_adapters("reviewer", "/tmp/x", {}) + assert "webhook" in str(ei.value) + assert "reviewer" in str(ei.value) + + @pytest.mark.asyncio + async def test_secondary_non_binding_platform_ok(self, monkeypatch): + """A non-port-binding platform (e.g. telegram) is NOT rejected.""" + from gateway.config import GatewayConfig, Platform, PlatformConfig + + runner = GatewayRunner.__new__(GatewayRunner) + runner.config = GatewayConfig(multiplex_profiles=True) + runner._profile_adapters = {} + + reviewer_cfg = GatewayConfig(multiplex_profiles=True) + reviewer_cfg.platforms = { + Platform.TELEGRAM: PlatformConfig(enabled=True, token="t"), + } + monkeypatch.setattr( + "gateway.config.load_gateway_config", lambda: reviewer_cfg + ) + # _create_adapter returns None here (no real telegram token wiring), so + # the loop simply connects nothing — the key assertion is NO raise. + monkeypatch.setattr(runner, "_create_adapter", lambda p, c: None) + + connected = await runner._start_one_profile_adapters("reviewer", "/tmp/x", {}) + assert connected == 0 # nothing connected, but no MultiplexConfigError + + def test_port_binding_set_covers_known_listeners(self): + from gateway.run import _PORT_BINDING_PLATFORM_VALUES + # Every adapter that binds a TCP port must be in the guard set. + for p in ("webhook", "api_server", "msgraph_webhook", "feishu", + "wecom_callback", "bluebubbles", "sms"): + assert p in _PORT_BINDING_PLATFORM_VALUES +