From d82f9fa7f7197b0a7e5246ca42802f96fbb7b734 Mon Sep 17 00:00:00 2001 From: Ben Barclay Date: Thu, 18 Jun 2026 15:50:57 +1000 Subject: [PATCH] =?UTF-8?q?feat(gateway):=20multiplex=20phase=200=20?= =?UTF-8?q?=E2=80=94=20config=20flag,=20profile=20enumeration,=20profile-s?= =?UTF-8?q?tamped=20session=20keys?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Foundations for serving multiple profiles from one gateway process, inert when off: - gateway.multiplex_profiles config flag (default false), round-trips through GatewayConfig and load_gateway_config (top-level + nested gateway.* form). - hermes_cli.profiles.profiles_to_serve(multiplex): the single chokepoint for which (profile, HERMES_HOME) pairs the gateway serves. Lightweight dir scan; active-profile-only when off, default + all named profiles when on. - build_session_key gains a profile= namespace slot. Default/None reuse the historical 'agent:main:...' literal BYTE-IDENTICALLY (no session migration, positional parsers unaffected); a named profile becomes 'agent::...' so two profiles on the same platform/chat never collide. - SessionStore._resolve_profile_for_key + _session_key_for_source fallback resolve the namespace from the flag (legacy when off, active profile when on). Tests: byte-identical-when-off (parametrized), namespace isolation, positional layout preserved, config round-trip, profiles_to_serve enumeration. --- gateway/config.py | 21 ++++ gateway/run.py | 11 ++ gateway/session.py | 62 ++++++++-- hermes_cli/profiles.py | 43 ++++++- tests/gateway/test_multiplex_phase0.py | 165 +++++++++++++++++++++++++ tests/hermes_cli/test_profiles.py | 46 +++++++ 6 files changed, 339 insertions(+), 9 deletions(-) create mode 100644 tests/gateway/test_multiplex_phase0.py diff --git a/gateway/config.py b/gateway/config.py index c63b9523d73..5b89c56b375 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -545,6 +545,13 @@ class GatewayConfig: thread_sessions_per_user: bool = False # When False (default), threads are shared across all participants max_concurrent_sessions: Optional[int] = None # Positive int caps simultaneous active chat sessions + # Multi-profile multiplexing (opt-in; default off preserves one-gateway-per-profile). + # When True, the default profile's gateway serves inbound messages for every + # profile on the host: profiles are stamped into session keys and (in later + # phases) per-profile adapters/credentials are resolved. When False, the + # gateway behaves exactly as before — single HERMES_HOME, no profile stamping. + multiplex_profiles: bool = False + # Unauthorized DM policy unauthorized_dm_behavior: str = "pair" # "pair" or "ignore" @@ -650,6 +657,7 @@ class GatewayConfig: "group_sessions_per_user": self.group_sessions_per_user, "thread_sessions_per_user": self.thread_sessions_per_user, "max_concurrent_sessions": self.max_concurrent_sessions, + "multiplex_profiles": self.multiplex_profiles, "unauthorized_dm_behavior": self.unauthorized_dm_behavior, "streaming": self.streaming.to_dict(), "session_store_max_age_days": self.session_store_max_age_days, @@ -695,7 +703,12 @@ class GatewayConfig: group_sessions_per_user = data.get("group_sessions_per_user") thread_sessions_per_user = data.get("thread_sessions_per_user") + multiplex_profiles = data.get("multiplex_profiles") nested_gateway = data.get("gateway") if isinstance(data.get("gateway"), dict) else {} + if multiplex_profiles is None and isinstance(nested_gateway, dict): + # Also honor gateway.multiplex_profiles written by + # ``hermes config set gateway.multiplex_profiles true``. + multiplex_profiles = nested_gateway.get("multiplex_profiles") if "max_concurrent_sessions" in data: max_concurrent_raw = data.get("max_concurrent_sessions") max_concurrent_key = "max_concurrent_sessions" @@ -732,6 +745,7 @@ class GatewayConfig: stt_enabled=_coerce_bool(stt_enabled, True), group_sessions_per_user=_coerce_bool(group_sessions_per_user, True), thread_sessions_per_user=_coerce_bool(thread_sessions_per_user, False), + multiplex_profiles=_coerce_bool(multiplex_profiles, False), max_concurrent_sessions=max_concurrent_sessions, unauthorized_dm_behavior=unauthorized_dm_behavior, streaming=StreamingConfig.from_dict(data.get("streaming", {})), @@ -823,6 +837,13 @@ def load_gateway_config() -> GatewayConfig: if "thread_sessions_per_user" in yaml_cfg: gw_data["thread_sessions_per_user"] = yaml_cfg["thread_sessions_per_user"] + # Multiplexing flag: accept both the top-level key and the nested + # gateway.multiplex_profiles form (from_dict resolves the nested + # fallback, but surface the top-level key here for parity with the + # other session-scope flags above). + if "multiplex_profiles" in yaml_cfg: + gw_data["multiplex_profiles"] = yaml_cfg["multiplex_profiles"] + gateway_section = yaml_cfg.get("gateway") if isinstance(gateway_section, dict) and "max_concurrent_sessions" in gateway_section: gw_data["max_concurrent_sessions"] = gateway_section["max_concurrent_sessions"] diff --git a/gateway/run.py b/gateway/run.py index b16110e54d4..c7037ec6b25 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -2814,10 +2814,21 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew except Exception: pass config = getattr(self, "config", None) + # Mirror SessionStore._resolve_profile_for_key so this fallback path + # produces the same namespace as the primary path: None (legacy + # agent:main) unless multiplexing is on, then the active profile. + _profile = None + if getattr(config, "multiplex_profiles", False): + try: + from hermes_cli.profiles import get_active_profile_name + _profile = get_active_profile_name() or "default" + except Exception: + _profile = None return build_session_key( source, group_sessions_per_user=getattr(config, "group_sessions_per_user", True), thread_sessions_per_user=getattr(config, "thread_sessions_per_user", False), + profile=_profile, ) def _telegram_topic_mode_enabled(self, source: SessionSource) -> bool: diff --git a/gateway/session.py b/gateway/session.py index f48b83fed0c..83b5ba5a812 100644 --- a/gateway/session.py +++ b/gateway/session.py @@ -615,15 +615,41 @@ def is_shared_multi_user_session( return not group_sessions_per_user +def _session_key_namespace(profile: Optional[str]) -> str: + """Return the ``agent:`` namespace prefix for a session key. + + The historical key format is ``agent:main:::...`` where + ``main`` is a static namespace literal (NOT a branch name — branching keys + off ``session_id``, not this slot). Multi-profile multiplexing reuses this + slot to carry the profile: + + - default profile (or ``None``/``""``/``"default"``) → ``agent:main`` — + BYTE-IDENTICAL to every key ever generated, so existing sessions and all + positional parsers (``parts[2]`` == platform, etc.) are unaffected. + - named profile ``coder`` → ``agent:coder`` — keeps the same positional + layout, just a different namespace, so two profiles serving the same + platform/chat never collide. + """ + if not profile or profile == "default": + return "agent:main" + return f"agent:{profile}" + + def build_session_key( source: SessionSource, group_sessions_per_user: bool = True, thread_sessions_per_user: bool = False, + profile: Optional[str] = None, ) -> str: """Build a deterministic session key from a message source. This is the single source of truth for session key construction. + ``profile`` selects the key namespace (see :func:`_session_key_namespace`). + It defaults to ``None`` ⇒ the legacy ``agent:main`` namespace, so callers + that don't multiplex produce byte-identical keys to before. Only the + multiplexing gateway passes a non-default profile. + DM rules: - DMs include chat_id when present, so each private conversation is isolated. - thread_id further differentiates threaded DMs within the same DM chat. @@ -643,6 +669,7 @@ def build_session_key( shared session per chat. - Without identifiers, messages fall back to one session per platform/chat_type. """ + ns = _session_key_namespace(profile) platform = source.platform.value if source.chat_type == "dm": dm_chat_id = source.chat_id @@ -651,12 +678,12 @@ def build_session_key( if dm_chat_id: if source.thread_id: - return f"agent:main:{platform}:dm:{dm_chat_id}:{source.thread_id}" - return f"agent:main:{platform}:dm:{dm_chat_id}" + return f"{ns}:{platform}:dm:{dm_chat_id}:{source.thread_id}" + return f"{ns}:{platform}:dm:{dm_chat_id}" # No chat_id — fall back to the sender's own identifier before the # bare per-platform sink. Without this, every DM from every user that # arrives without a chat_id (non-standard adapters / synthetic sources) - # collapses into one shared "agent:main::dm" session, and a + # collapses into one shared "::dm" session, and a # single cached agent ends up serving multiple people's conversations — # cross-user history bleed. participant_id keeps DMs isolated per user. dm_participant_id = source.user_id_alt or source.user_id @@ -667,11 +694,11 @@ def build_session_key( ) if dm_participant_id: if source.thread_id: - return f"agent:main:{platform}:dm:{dm_participant_id}:{source.thread_id}" - return f"agent:main:{platform}:dm:{dm_participant_id}" + return f"{ns}:{platform}:dm:{dm_participant_id}:{source.thread_id}" + return f"{ns}:{platform}:dm:{dm_participant_id}" if source.thread_id: - return f"agent:main:{platform}:dm:{source.thread_id}" - return f"agent:main:{platform}:dm" + return f"{ns}:{platform}:dm:{source.thread_id}" + return f"{ns}:{platform}:dm" participant_id = source.user_id_alt or source.user_id if participant_id and source.platform == Platform.WHATSAPP: @@ -679,7 +706,7 @@ def build_session_key( # single group member gets two isolated per-user sessions when the # bridge reshuffles alias forms. participant_id = canonical_whatsapp_identifier(str(participant_id)) or participant_id - key_parts = ["agent:main", platform, source.chat_type] + key_parts = [ns, platform, source.chat_type] if source.chat_id: key_parts.append(source.chat_id) @@ -775,12 +802,31 @@ class SessionStore: logger.debug("Could not remove temp file %s: %s", tmp_path, e) raise + def _resolve_profile_for_key(self) -> Optional[str]: + """Return the profile namespace for session keys, or None when off. + + Phase 0: when ``multiplex_profiles`` is disabled (default), returns + ``None`` so keys stay in the legacy ``agent:main`` namespace — + byte-identical to before. When enabled, returns the active profile name + so this store's keys are namespaced to it. Per-source profile + attribution (one store serving many profiles) arrives in a later phase; + until then the active profile is the correct namespace. + """ + if not getattr(self.config, "multiplex_profiles", False): + return None + try: + from hermes_cli.profiles import get_active_profile_name + return get_active_profile_name() or "default" + except Exception: + return None + def _generate_session_key(self, source: SessionSource) -> str: """Generate a session key from a source.""" return build_session_key( source, group_sessions_per_user=getattr(self.config, "group_sessions_per_user", True), thread_sessions_per_user=getattr(self.config, "thread_sessions_per_user", False), + profile=self._resolve_profile_for_key(), ) def _is_session_expired(self, entry: SessionEntry) -> bool: diff --git a/hermes_cli/profiles.py b/hermes_cli/profiles.py index 881dd481445..490077884e5 100644 --- a/hermes_cli/profiles.py +++ b/hermes_cli/profiles.py @@ -29,7 +29,7 @@ import subprocess import sys from dataclasses import dataclass from pathlib import Path, PurePosixPath, PureWindowsPath -from typing import List, Optional +from typing import List, Optional, Tuple from agent.skill_utils import is_excluded_skill_path @@ -781,6 +781,47 @@ def list_profiles() -> List[ProfileInfo]: return profiles +def profiles_to_serve(multiplex: bool) -> List[Tuple[str, Path]]: + """Return the ``(profile_name, hermes_home)`` pairs a gateway should serve. + + This is the single chokepoint for "which profiles does the inbound gateway + handle" so later multiplexing phases never re-derive the set. + + - ``multiplex=False`` (default): returns exactly one entry for the *active* + profile — byte-for-byte the single-profile behavior the gateway has + always had. The name is ``"default"`` for the default profile or the + active named profile's id. + - ``multiplex=True``: returns the default profile plus every valid named + profile under ``profiles/``, each paired with its own HERMES_HOME. + + Intentionally lightweight (a directory scan + name validation only): no + per-profile config reads, gateway-running probes, or skill counts like + :func:`list_profiles`. It runs on gateway startup and must stay cheap. + + The returned ``hermes_home`` is the path to pass to + ``set_hermes_home_override`` when scoping a turn to that profile. + """ + active = get_active_profile_name() or "default" + if not multiplex: + return [(active, get_profile_dir(active))] + + serve: List[Tuple[str, Path]] = [("default", _get_default_hermes_home())] + + profiles_root = _get_profiles_root() + if profiles_root.is_dir(): + for entry in sorted(profiles_root.iterdir()): + if not entry.is_dir(): + continue + name = entry.name + if name == "default": + continue # default is the built-in entry already added above + if not _PROFILE_ID_RE.match(name): + continue + serve.append((name, entry)) + + return serve + + def create_profile( name: str, clone_from: Optional[str] = None, diff --git a/tests/gateway/test_multiplex_phase0.py b/tests/gateway/test_multiplex_phase0.py new file mode 100644 index 00000000000..0297b08494c --- /dev/null +++ b/tests/gateway/test_multiplex_phase0.py @@ -0,0 +1,165 @@ +"""Phase 0 foundations for multi-profile gateway multiplexing. + +Covers the three Phase 0 deliverables: + 1. ``gateway.multiplex_profiles`` config flag (default False, round-trips). + 2. ``hermes_cli.profiles.profiles_to_serve`` enumeration. + 3. Profile-stamped ``build_session_key`` that is BYTE-IDENTICAL when the + flag is off (the orphan-every-session guard) and namespace-segmented when + on, without disturbing the positional key layout downstream parsers rely + on. +""" +import pytest +from unittest.mock import patch + +from gateway.config import GatewayConfig, Platform +from gateway.session import SessionSource, SessionStore, build_session_key + + +def _src(**kw) -> SessionSource: + kw.setdefault("platform", Platform.TELEGRAM) + kw.setdefault("chat_id", "99") + kw.setdefault("chat_type", "dm") + return SessionSource(**kw) + + +class TestSessionKeyByteIdenticalWhenOff: + """The non-negotiable guard: with no profile (or 'default'), every key is + byte-for-byte what it was before Phase 0. A diff here orphans every + existing session on upgrade.""" + + @pytest.mark.parametrize("profile", [None, "default"]) + def test_dm_with_chat_id(self, profile): + s = _src(chat_id="99", chat_type="dm") + assert build_session_key(s, profile=profile) == "agent:main:telegram:dm:99" + + @pytest.mark.parametrize("profile", [None, "default"]) + def test_dm_with_thread(self, profile): + s = _src(chat_id="99", chat_type="dm", thread_id="t1") + assert build_session_key(s, profile=profile) == "agent:main:telegram:dm:99:t1" + + @pytest.mark.parametrize("profile", [None, "default"]) + def test_dm_without_chat_id_falls_back_to_user(self, profile): + s = _src(chat_id="", chat_type="dm", user_id="jordan") + assert build_session_key(s, profile=profile) == "agent:main:telegram:dm:jordan" + + @pytest.mark.parametrize("profile", [None, "default"]) + def test_group_per_user(self, profile): + s = _src(platform=Platform.DISCORD, chat_id="g1", chat_type="group", user_id="alice") + assert ( + build_session_key(s, profile=profile) + == "agent:main:discord:group:g1:alice" + ) + + @pytest.mark.parametrize("profile", [None, "default"]) + def test_group_shared_when_disabled(self, profile): + s = _src(platform=Platform.DISCORD, chat_id="g1", chat_type="group", user_id="alice") + assert ( + build_session_key(s, group_sessions_per_user=False, profile=profile) + == "agent:main:discord:group:g1" + ) + + +class TestSessionKeyNamespacedWhenOn: + """A named profile occupies the namespace slot, isolating its sessions.""" + + def test_named_profile_dm(self): + s = _src(chat_id="99", chat_type="dm") + assert build_session_key(s, profile="coder") == "agent:coder:telegram:dm:99" + + def test_named_profile_group_per_user(self): + s = _src(platform=Platform.DISCORD, chat_id="g1", chat_type="group", user_id="alice") + assert ( + build_session_key(s, profile="coder") + == "agent:coder:discord:group:g1:alice" + ) + + def test_two_profiles_same_chat_do_not_collide(self): + s = _src(chat_id="99", chat_type="dm") + a = build_session_key(s, profile="default") + b = build_session_key(s, profile="coder") + c = build_session_key(s, profile="writer") + assert a != b != c and a != c + + def test_positional_layout_preserved_for_parsers(self): + """Downstream parsers split on ':' and read parts[2]=platform, + parts[3]=chat_type, parts[4]=chat_id (see qqbot adapter + _parse_gateway_session_key). The profile must occupy parts[1] only.""" + s = _src(platform=Platform.DISCORD, chat_id="g1", chat_type="group", user_id="alice") + parts = build_session_key(s, profile="coder").split(":") + assert parts[0] == "agent" + assert parts[1] == "coder" # namespace slot (was always 'main') + assert parts[2] == "discord" # platform — unchanged offset + assert parts[3] == "group" # chat_type — unchanged offset + assert parts[4] == "g1" # chat_id — unchanged offset + + def test_default_namespace_layout_matches_named(self): + """Default and named keys differ ONLY in parts[1].""" + s = _src(platform=Platform.SLACK, chat_id="c1", chat_type="channel", user_id="u1") + d = build_session_key(s, profile="default").split(":") + n = build_session_key(s, profile="coder").split(":") + assert d[0] == n[0] == "agent" + assert d[1] == "main" and n[1] == "coder" + assert d[2:] == n[2:] # everything after the namespace is identical + + +class TestMultiplexConfigFlag: + """gateway.multiplex_profiles defaults off and round-trips.""" + + def test_default_is_false(self): + assert GatewayConfig().multiplex_profiles is False + + def test_to_dict_includes_flag(self): + assert GatewayConfig().to_dict()["multiplex_profiles"] is False + + def test_from_dict_top_level(self): + cfg = GatewayConfig.from_dict({"multiplex_profiles": True}) + assert cfg.multiplex_profiles is True + + def test_from_dict_nested_gateway(self): + cfg = GatewayConfig.from_dict({"gateway": {"multiplex_profiles": True}}) + assert cfg.multiplex_profiles is True + + def test_from_dict_coerces_truthy_string(self): + cfg = GatewayConfig.from_dict({"multiplex_profiles": "true"}) + assert cfg.multiplex_profiles is True + + def test_roundtrip(self): + cfg = GatewayConfig.from_dict(GatewayConfig(multiplex_profiles=True).to_dict()) + assert cfg.multiplex_profiles is True + + +class TestSessionStoreProfileResolution: + """SessionStore._generate_session_key honors the flag: legacy namespace + when off, active-profile namespace when on.""" + + def _store(self, tmp_path, **cfg_kw): + config = GatewayConfig(**cfg_kw) + with patch("gateway.session.SessionStore._ensure_loaded"): + s = SessionStore(sessions_dir=tmp_path, config=config) + s._db = None + s._loaded = True + return s + + def test_flag_off_uses_legacy_namespace(self, tmp_path): + store = self._store(tmp_path) # multiplex_profiles defaults False + s = _src(chat_id="99", chat_type="dm") + assert store._generate_session_key(s) == "agent:main:telegram:dm:99" + assert store._generate_session_key(s) == build_session_key(s) + + def test_flag_off_resolve_profile_is_none(self, tmp_path): + store = self._store(tmp_path) + assert store._resolve_profile_for_key() is None + + def test_flag_on_uses_active_profile_namespace(self, tmp_path): + store = self._store(tmp_path, multiplex_profiles=True) + s = _src(chat_id="99", chat_type="dm") + with patch("hermes_cli.profiles.get_active_profile_name", return_value="coder"): + assert store._generate_session_key(s) == "agent:coder:telegram:dm:99" + + def test_flag_on_default_profile_stays_legacy(self, tmp_path): + store = self._store(tmp_path, multiplex_profiles=True) + s = _src(chat_id="99", chat_type="dm") + with patch("hermes_cli.profiles.get_active_profile_name", return_value="default"): + assert store._generate_session_key(s) == "agent:main:telegram:dm:99" + + diff --git a/tests/hermes_cli/test_profiles.py b/tests/hermes_cli/test_profiles.py index 1ea1845d9d3..59afe84e563 100644 --- a/tests/hermes_cli/test_profiles.py +++ b/tests/hermes_cli/test_profiles.py @@ -35,6 +35,7 @@ from hermes_cli.profiles import ( has_bundled_skills_opt_out, NO_BUNDLED_SKILLS_MARKER, backfill_profile_envs, + profiles_to_serve, ) from hermes_cli.config import DEFAULT_CONFIG @@ -1487,3 +1488,48 @@ class TestEdgeCases: delete_profile("coder", yes=True) assert get_active_profile() == "default" + + +class TestProfilesToServe: + """profiles_to_serve(multiplex) — the gateway's profile-enumeration chokepoint.""" + + def test_off_returns_only_active_default(self, profile_env): + serve = profiles_to_serve(multiplex=False) + assert len(serve) == 1 + name, home = serve[0] + assert name == "default" + assert home == _get_default_hermes_home() + + def test_off_returns_only_active_named(self, profile_env, monkeypatch): + # A named profile's gateway runs with HERMES_HOME pointing at the + # profile dir; get_active_profile_name() infers the name from there. + create_profile("coder", no_alias=True) + monkeypatch.setenv("HERMES_HOME", str(get_profile_dir("coder"))) + serve = profiles_to_serve(multiplex=False) + assert len(serve) == 1 + assert serve[0][0] == "coder" + assert serve[0][1] == get_profile_dir("coder") + + def test_on_returns_default_plus_all_named(self, profile_env): + create_profile("coder", no_alias=True) + create_profile("writer", no_alias=True) + serve = dict(profiles_to_serve(multiplex=True)) + assert set(serve) == {"default", "coder", "writer"} + assert serve["default"] == _get_default_hermes_home() + assert serve["coder"] == get_profile_dir("coder") + + def test_on_default_always_first(self, profile_env): + create_profile("coder", no_alias=True) + serve = profiles_to_serve(multiplex=True) + assert serve[0][0] == "default" + + def test_on_active_profile_does_not_change_set(self, profile_env): + """Enumeration is independent of which profile is active.""" + create_profile("coder", no_alias=True) + set_active_profile("coder") + serve = dict(profiles_to_serve(multiplex=True)) + assert set(serve) == {"default", "coder"} + + def test_on_no_named_profiles_returns_just_default(self, profile_env): + serve = profiles_to_serve(multiplex=True) + assert [n for n, _ in serve] == ["default"]