feat(gateway): multiplex phase 0 — config flag, profile enumeration, profile-stamped session keys

Foundations for serving multiple profiles from one gateway process, inert
when off:

- gateway.multiplex_profiles config flag (default false), round-trips through
  GatewayConfig and load_gateway_config (top-level + nested gateway.* form).
- hermes_cli.profiles.profiles_to_serve(multiplex): the single chokepoint for
  which (profile, HERMES_HOME) pairs the gateway serves. Lightweight dir scan;
  active-profile-only when off, default + all named profiles when on.
- build_session_key gains a profile= namespace slot. Default/None reuse the
  historical 'agent:main:...' literal BYTE-IDENTICALLY (no session migration,
  positional parsers unaffected); a named profile becomes 'agent:<profile>:...'
  so two profiles on the same platform/chat never collide.
- SessionStore._resolve_profile_for_key + _session_key_for_source fallback
  resolve the namespace from the flag (legacy when off, active profile when on).

Tests: byte-identical-when-off (parametrized), namespace isolation, positional
layout preserved, config round-trip, profiles_to_serve enumeration.
This commit is contained in:
Ben Barclay 2026-06-18 15:50:57 +10:00 committed by Teknium
parent 9e1f616136
commit d82f9fa7f7
6 changed files with 339 additions and 9 deletions

View file

@ -545,6 +545,13 @@ class GatewayConfig:
thread_sessions_per_user: bool = False # When False (default), threads are shared across all participants
max_concurrent_sessions: Optional[int] = None # Positive int caps simultaneous active chat sessions
# Multi-profile multiplexing (opt-in; default off preserves one-gateway-per-profile).
# When True, the default profile's gateway serves inbound messages for every
# profile on the host: profiles are stamped into session keys and (in later
# phases) per-profile adapters/credentials are resolved. When False, the
# gateway behaves exactly as before — single HERMES_HOME, no profile stamping.
multiplex_profiles: bool = False
# Unauthorized DM policy
unauthorized_dm_behavior: str = "pair" # "pair" or "ignore"
@ -650,6 +657,7 @@ class GatewayConfig:
"group_sessions_per_user": self.group_sessions_per_user,
"thread_sessions_per_user": self.thread_sessions_per_user,
"max_concurrent_sessions": self.max_concurrent_sessions,
"multiplex_profiles": self.multiplex_profiles,
"unauthorized_dm_behavior": self.unauthorized_dm_behavior,
"streaming": self.streaming.to_dict(),
"session_store_max_age_days": self.session_store_max_age_days,
@ -695,7 +703,12 @@ class GatewayConfig:
group_sessions_per_user = data.get("group_sessions_per_user")
thread_sessions_per_user = data.get("thread_sessions_per_user")
multiplex_profiles = data.get("multiplex_profiles")
nested_gateway = data.get("gateway") if isinstance(data.get("gateway"), dict) else {}
if multiplex_profiles is None and isinstance(nested_gateway, dict):
# Also honor gateway.multiplex_profiles written by
# ``hermes config set gateway.multiplex_profiles true``.
multiplex_profiles = nested_gateway.get("multiplex_profiles")
if "max_concurrent_sessions" in data:
max_concurrent_raw = data.get("max_concurrent_sessions")
max_concurrent_key = "max_concurrent_sessions"
@ -732,6 +745,7 @@ class GatewayConfig:
stt_enabled=_coerce_bool(stt_enabled, True),
group_sessions_per_user=_coerce_bool(group_sessions_per_user, True),
thread_sessions_per_user=_coerce_bool(thread_sessions_per_user, False),
multiplex_profiles=_coerce_bool(multiplex_profiles, False),
max_concurrent_sessions=max_concurrent_sessions,
unauthorized_dm_behavior=unauthorized_dm_behavior,
streaming=StreamingConfig.from_dict(data.get("streaming", {})),
@ -823,6 +837,13 @@ def load_gateway_config() -> GatewayConfig:
if "thread_sessions_per_user" in yaml_cfg:
gw_data["thread_sessions_per_user"] = yaml_cfg["thread_sessions_per_user"]
# Multiplexing flag: accept both the top-level key and the nested
# gateway.multiplex_profiles form (from_dict resolves the nested
# fallback, but surface the top-level key here for parity with the
# other session-scope flags above).
if "multiplex_profiles" in yaml_cfg:
gw_data["multiplex_profiles"] = yaml_cfg["multiplex_profiles"]
gateway_section = yaml_cfg.get("gateway")
if isinstance(gateway_section, dict) and "max_concurrent_sessions" in gateway_section:
gw_data["max_concurrent_sessions"] = gateway_section["max_concurrent_sessions"]

View file

@ -2814,10 +2814,21 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
except Exception:
pass
config = getattr(self, "config", None)
# Mirror SessionStore._resolve_profile_for_key so this fallback path
# produces the same namespace as the primary path: None (legacy
# agent:main) unless multiplexing is on, then the active profile.
_profile = None
if getattr(config, "multiplex_profiles", False):
try:
from hermes_cli.profiles import get_active_profile_name
_profile = get_active_profile_name() or "default"
except Exception:
_profile = None
return build_session_key(
source,
group_sessions_per_user=getattr(config, "group_sessions_per_user", True),
thread_sessions_per_user=getattr(config, "thread_sessions_per_user", False),
profile=_profile,
)
def _telegram_topic_mode_enabled(self, source: SessionSource) -> bool:

View file

@ -615,15 +615,41 @@ def is_shared_multi_user_session(
return not group_sessions_per_user
def _session_key_namespace(profile: Optional[str]) -> str:
"""Return the ``agent:<ns>`` namespace prefix for a session key.
The historical key format is ``agent:main:<platform>:<chat_type>:...`` where
``main`` is a static namespace literal (NOT a branch name branching keys
off ``session_id``, not this slot). Multi-profile multiplexing reuses this
slot to carry the profile:
- default profile (or ``None``/``""``/``"default"``) ``agent:main``
BYTE-IDENTICAL to every key ever generated, so existing sessions and all
positional parsers (``parts[2]`` == platform, etc.) are unaffected.
- named profile ``coder`` ``agent:coder`` keeps the same positional
layout, just a different namespace, so two profiles serving the same
platform/chat never collide.
"""
if not profile or profile == "default":
return "agent:main"
return f"agent:{profile}"
def build_session_key(
source: SessionSource,
group_sessions_per_user: bool = True,
thread_sessions_per_user: bool = False,
profile: Optional[str] = None,
) -> str:
"""Build a deterministic session key from a message source.
This is the single source of truth for session key construction.
``profile`` selects the key namespace (see :func:`_session_key_namespace`).
It defaults to ``None`` the legacy ``agent:main`` namespace, so callers
that don't multiplex produce byte-identical keys to before. Only the
multiplexing gateway passes a non-default profile.
DM rules:
- DMs include chat_id when present, so each private conversation is isolated.
- thread_id further differentiates threaded DMs within the same DM chat.
@ -643,6 +669,7 @@ def build_session_key(
shared session per chat.
- Without identifiers, messages fall back to one session per platform/chat_type.
"""
ns = _session_key_namespace(profile)
platform = source.platform.value
if source.chat_type == "dm":
dm_chat_id = source.chat_id
@ -651,12 +678,12 @@ def build_session_key(
if dm_chat_id:
if source.thread_id:
return f"agent:main:{platform}:dm:{dm_chat_id}:{source.thread_id}"
return f"agent:main:{platform}:dm:{dm_chat_id}"
return f"{ns}:{platform}:dm:{dm_chat_id}:{source.thread_id}"
return f"{ns}:{platform}:dm:{dm_chat_id}"
# No chat_id — fall back to the sender's own identifier before the
# bare per-platform sink. Without this, every DM from every user that
# arrives without a chat_id (non-standard adapters / synthetic sources)
# collapses into one shared "agent:main:<platform>:dm" session, and a
# collapses into one shared "<ns>:<platform>:dm" session, and a
# single cached agent ends up serving multiple people's conversations —
# cross-user history bleed. participant_id keeps DMs isolated per user.
dm_participant_id = source.user_id_alt or source.user_id
@ -667,11 +694,11 @@ def build_session_key(
)
if dm_participant_id:
if source.thread_id:
return f"agent:main:{platform}:dm:{dm_participant_id}:{source.thread_id}"
return f"agent:main:{platform}:dm:{dm_participant_id}"
return f"{ns}:{platform}:dm:{dm_participant_id}:{source.thread_id}"
return f"{ns}:{platform}:dm:{dm_participant_id}"
if source.thread_id:
return f"agent:main:{platform}:dm:{source.thread_id}"
return f"agent:main:{platform}:dm"
return f"{ns}:{platform}:dm:{source.thread_id}"
return f"{ns}:{platform}:dm"
participant_id = source.user_id_alt or source.user_id
if participant_id and source.platform == Platform.WHATSAPP:
@ -679,7 +706,7 @@ def build_session_key(
# single group member gets two isolated per-user sessions when the
# bridge reshuffles alias forms.
participant_id = canonical_whatsapp_identifier(str(participant_id)) or participant_id
key_parts = ["agent:main", platform, source.chat_type]
key_parts = [ns, platform, source.chat_type]
if source.chat_id:
key_parts.append(source.chat_id)
@ -775,12 +802,31 @@ class SessionStore:
logger.debug("Could not remove temp file %s: %s", tmp_path, e)
raise
def _resolve_profile_for_key(self) -> Optional[str]:
"""Return the profile namespace for session keys, or None when off.
Phase 0: when ``multiplex_profiles`` is disabled (default), returns
``None`` so keys stay in the legacy ``agent:main`` namespace
byte-identical to before. When enabled, returns the active profile name
so this store's keys are namespaced to it. Per-source profile
attribution (one store serving many profiles) arrives in a later phase;
until then the active profile is the correct namespace.
"""
if not getattr(self.config, "multiplex_profiles", False):
return None
try:
from hermes_cli.profiles import get_active_profile_name
return get_active_profile_name() or "default"
except Exception:
return None
def _generate_session_key(self, source: SessionSource) -> str:
"""Generate a session key from a source."""
return build_session_key(
source,
group_sessions_per_user=getattr(self.config, "group_sessions_per_user", True),
thread_sessions_per_user=getattr(self.config, "thread_sessions_per_user", False),
profile=self._resolve_profile_for_key(),
)
def _is_session_expired(self, entry: SessionEntry) -> bool:

View file

@ -29,7 +29,7 @@ import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path, PurePosixPath, PureWindowsPath
from typing import List, Optional
from typing import List, Optional, Tuple
from agent.skill_utils import is_excluded_skill_path
@ -781,6 +781,47 @@ def list_profiles() -> List[ProfileInfo]:
return profiles
def profiles_to_serve(multiplex: bool) -> List[Tuple[str, Path]]:
"""Return the ``(profile_name, hermes_home)`` pairs a gateway should serve.
This is the single chokepoint for "which profiles does the inbound gateway
handle" so later multiplexing phases never re-derive the set.
- ``multiplex=False`` (default): returns exactly one entry for the *active*
profile byte-for-byte the single-profile behavior the gateway has
always had. The name is ``"default"`` for the default profile or the
active named profile's id.
- ``multiplex=True``: returns the default profile plus every valid named
profile under ``profiles/``, each paired with its own HERMES_HOME.
Intentionally lightweight (a directory scan + name validation only): no
per-profile config reads, gateway-running probes, or skill counts like
:func:`list_profiles`. It runs on gateway startup and must stay cheap.
The returned ``hermes_home`` is the path to pass to
``set_hermes_home_override`` when scoping a turn to that profile.
"""
active = get_active_profile_name() or "default"
if not multiplex:
return [(active, get_profile_dir(active))]
serve: List[Tuple[str, Path]] = [("default", _get_default_hermes_home())]
profiles_root = _get_profiles_root()
if profiles_root.is_dir():
for entry in sorted(profiles_root.iterdir()):
if not entry.is_dir():
continue
name = entry.name
if name == "default":
continue # default is the built-in entry already added above
if not _PROFILE_ID_RE.match(name):
continue
serve.append((name, entry))
return serve
def create_profile(
name: str,
clone_from: Optional[str] = None,

View file

@ -0,0 +1,165 @@
"""Phase 0 foundations for multi-profile gateway multiplexing.
Covers the three Phase 0 deliverables:
1. ``gateway.multiplex_profiles`` config flag (default False, round-trips).
2. ``hermes_cli.profiles.profiles_to_serve`` enumeration.
3. Profile-stamped ``build_session_key`` that is BYTE-IDENTICAL when the
flag is off (the orphan-every-session guard) and namespace-segmented when
on, without disturbing the positional key layout downstream parsers rely
on.
"""
import pytest
from unittest.mock import patch
from gateway.config import GatewayConfig, Platform
from gateway.session import SessionSource, SessionStore, build_session_key
def _src(**kw) -> SessionSource:
kw.setdefault("platform", Platform.TELEGRAM)
kw.setdefault("chat_id", "99")
kw.setdefault("chat_type", "dm")
return SessionSource(**kw)
class TestSessionKeyByteIdenticalWhenOff:
"""The non-negotiable guard: with no profile (or 'default'), every key is
byte-for-byte what it was before Phase 0. A diff here orphans every
existing session on upgrade."""
@pytest.mark.parametrize("profile", [None, "default"])
def test_dm_with_chat_id(self, profile):
s = _src(chat_id="99", chat_type="dm")
assert build_session_key(s, profile=profile) == "agent:main:telegram:dm:99"
@pytest.mark.parametrize("profile", [None, "default"])
def test_dm_with_thread(self, profile):
s = _src(chat_id="99", chat_type="dm", thread_id="t1")
assert build_session_key(s, profile=profile) == "agent:main:telegram:dm:99:t1"
@pytest.mark.parametrize("profile", [None, "default"])
def test_dm_without_chat_id_falls_back_to_user(self, profile):
s = _src(chat_id="", chat_type="dm", user_id="jordan")
assert build_session_key(s, profile=profile) == "agent:main:telegram:dm:jordan"
@pytest.mark.parametrize("profile", [None, "default"])
def test_group_per_user(self, profile):
s = _src(platform=Platform.DISCORD, chat_id="g1", chat_type="group", user_id="alice")
assert (
build_session_key(s, profile=profile)
== "agent:main:discord:group:g1:alice"
)
@pytest.mark.parametrize("profile", [None, "default"])
def test_group_shared_when_disabled(self, profile):
s = _src(platform=Platform.DISCORD, chat_id="g1", chat_type="group", user_id="alice")
assert (
build_session_key(s, group_sessions_per_user=False, profile=profile)
== "agent:main:discord:group:g1"
)
class TestSessionKeyNamespacedWhenOn:
"""A named profile occupies the namespace slot, isolating its sessions."""
def test_named_profile_dm(self):
s = _src(chat_id="99", chat_type="dm")
assert build_session_key(s, profile="coder") == "agent:coder:telegram:dm:99"
def test_named_profile_group_per_user(self):
s = _src(platform=Platform.DISCORD, chat_id="g1", chat_type="group", user_id="alice")
assert (
build_session_key(s, profile="coder")
== "agent:coder:discord:group:g1:alice"
)
def test_two_profiles_same_chat_do_not_collide(self):
s = _src(chat_id="99", chat_type="dm")
a = build_session_key(s, profile="default")
b = build_session_key(s, profile="coder")
c = build_session_key(s, profile="writer")
assert a != b != c and a != c
def test_positional_layout_preserved_for_parsers(self):
"""Downstream parsers split on ':' and read parts[2]=platform,
parts[3]=chat_type, parts[4]=chat_id (see qqbot adapter
_parse_gateway_session_key). The profile must occupy parts[1] only."""
s = _src(platform=Platform.DISCORD, chat_id="g1", chat_type="group", user_id="alice")
parts = build_session_key(s, profile="coder").split(":")
assert parts[0] == "agent"
assert parts[1] == "coder" # namespace slot (was always 'main')
assert parts[2] == "discord" # platform — unchanged offset
assert parts[3] == "group" # chat_type — unchanged offset
assert parts[4] == "g1" # chat_id — unchanged offset
def test_default_namespace_layout_matches_named(self):
"""Default and named keys differ ONLY in parts[1]."""
s = _src(platform=Platform.SLACK, chat_id="c1", chat_type="channel", user_id="u1")
d = build_session_key(s, profile="default").split(":")
n = build_session_key(s, profile="coder").split(":")
assert d[0] == n[0] == "agent"
assert d[1] == "main" and n[1] == "coder"
assert d[2:] == n[2:] # everything after the namespace is identical
class TestMultiplexConfigFlag:
"""gateway.multiplex_profiles defaults off and round-trips."""
def test_default_is_false(self):
assert GatewayConfig().multiplex_profiles is False
def test_to_dict_includes_flag(self):
assert GatewayConfig().to_dict()["multiplex_profiles"] is False
def test_from_dict_top_level(self):
cfg = GatewayConfig.from_dict({"multiplex_profiles": True})
assert cfg.multiplex_profiles is True
def test_from_dict_nested_gateway(self):
cfg = GatewayConfig.from_dict({"gateway": {"multiplex_profiles": True}})
assert cfg.multiplex_profiles is True
def test_from_dict_coerces_truthy_string(self):
cfg = GatewayConfig.from_dict({"multiplex_profiles": "true"})
assert cfg.multiplex_profiles is True
def test_roundtrip(self):
cfg = GatewayConfig.from_dict(GatewayConfig(multiplex_profiles=True).to_dict())
assert cfg.multiplex_profiles is True
class TestSessionStoreProfileResolution:
"""SessionStore._generate_session_key honors the flag: legacy namespace
when off, active-profile namespace when on."""
def _store(self, tmp_path, **cfg_kw):
config = GatewayConfig(**cfg_kw)
with patch("gateway.session.SessionStore._ensure_loaded"):
s = SessionStore(sessions_dir=tmp_path, config=config)
s._db = None
s._loaded = True
return s
def test_flag_off_uses_legacy_namespace(self, tmp_path):
store = self._store(tmp_path) # multiplex_profiles defaults False
s = _src(chat_id="99", chat_type="dm")
assert store._generate_session_key(s) == "agent:main:telegram:dm:99"
assert store._generate_session_key(s) == build_session_key(s)
def test_flag_off_resolve_profile_is_none(self, tmp_path):
store = self._store(tmp_path)
assert store._resolve_profile_for_key() is None
def test_flag_on_uses_active_profile_namespace(self, tmp_path):
store = self._store(tmp_path, multiplex_profiles=True)
s = _src(chat_id="99", chat_type="dm")
with patch("hermes_cli.profiles.get_active_profile_name", return_value="coder"):
assert store._generate_session_key(s) == "agent:coder:telegram:dm:99"
def test_flag_on_default_profile_stays_legacy(self, tmp_path):
store = self._store(tmp_path, multiplex_profiles=True)
s = _src(chat_id="99", chat_type="dm")
with patch("hermes_cli.profiles.get_active_profile_name", return_value="default"):
assert store._generate_session_key(s) == "agent:main:telegram:dm:99"

View file

@ -35,6 +35,7 @@ from hermes_cli.profiles import (
has_bundled_skills_opt_out,
NO_BUNDLED_SKILLS_MARKER,
backfill_profile_envs,
profiles_to_serve,
)
from hermes_cli.config import DEFAULT_CONFIG
@ -1487,3 +1488,48 @@ class TestEdgeCases:
delete_profile("coder", yes=True)
assert get_active_profile() == "default"
class TestProfilesToServe:
"""profiles_to_serve(multiplex) — the gateway's profile-enumeration chokepoint."""
def test_off_returns_only_active_default(self, profile_env):
serve = profiles_to_serve(multiplex=False)
assert len(serve) == 1
name, home = serve[0]
assert name == "default"
assert home == _get_default_hermes_home()
def test_off_returns_only_active_named(self, profile_env, monkeypatch):
# A named profile's gateway runs with HERMES_HOME pointing at the
# profile dir; get_active_profile_name() infers the name from there.
create_profile("coder", no_alias=True)
monkeypatch.setenv("HERMES_HOME", str(get_profile_dir("coder")))
serve = profiles_to_serve(multiplex=False)
assert len(serve) == 1
assert serve[0][0] == "coder"
assert serve[0][1] == get_profile_dir("coder")
def test_on_returns_default_plus_all_named(self, profile_env):
create_profile("coder", no_alias=True)
create_profile("writer", no_alias=True)
serve = dict(profiles_to_serve(multiplex=True))
assert set(serve) == {"default", "coder", "writer"}
assert serve["default"] == _get_default_hermes_home()
assert serve["coder"] == get_profile_dir("coder")
def test_on_default_always_first(self, profile_env):
create_profile("coder", no_alias=True)
serve = profiles_to_serve(multiplex=True)
assert serve[0][0] == "default"
def test_on_active_profile_does_not_change_set(self, profile_env):
"""Enumeration is independent of which profile is active."""
create_profile("coder", no_alias=True)
set_active_profile("coder")
serve = dict(profiles_to_serve(multiplex=True))
assert set(serve) == {"default", "coder"}
def test_on_no_named_profiles_returns_just_default(self, profile_env):
serve = profiles_to_serve(multiplex=True)
assert [n for n, _ in serve] == ["default"]