feat: shared thread sessions by default — multi-user thread support (#5391)

Threads (Telegram forum topics, Discord threads, Slack threads) now default
to shared sessions where all participants see the same conversation. This is
the expected UX for threaded conversations where multiple users @mention the
bot and interact collaboratively.

Changes:
- build_session_key(): when thread_id is present, user_id is no longer
  appended to the session key (threads are shared by default)
- New config: thread_sessions_per_user (default: false) — opt-in to restore
  per-user isolation in threads if needed
- Sender attribution: messages in shared threads are prefixed with
  [sender name] so the agent can tell participants apart
- System prompt: shared threads show 'Multi-user thread' note instead of
  a per-turn User line (avoids busting prompt cache)
- Wired through all callers: gateway/run.py, base.py, telegram.py, feishu.py
- Regular group messages (no thread) remain per-user isolated (unchanged)
- DM threads are unaffected (they have their own keying logic)

Closes community request from demontut_ re: thread-based shared sessions.
This commit is contained in:
Teknium 2026-04-05 19:46:58 -07:00 committed by GitHub
parent 43d468cea8
commit 89c812d1d2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 233 additions and 7 deletions

View file

@ -246,6 +246,7 @@ class GatewayConfig:
# Session isolation in shared chats
group_sessions_per_user: bool = True # Isolate group/channel sessions per participant when user IDs are available
thread_sessions_per_user: bool = False # When False (default), threads are shared across all participants
# Unauthorized DM policy
unauthorized_dm_behavior: str = "pair" # "pair" or "ignore"
@ -333,6 +334,7 @@ class GatewayConfig:
"always_log_local": self.always_log_local,
"stt_enabled": self.stt_enabled,
"group_sessions_per_user": self.group_sessions_per_user,
"thread_sessions_per_user": self.thread_sessions_per_user,
"unauthorized_dm_behavior": self.unauthorized_dm_behavior,
"streaming": self.streaming.to_dict(),
}
@ -376,6 +378,7 @@ class GatewayConfig:
stt_enabled = data.get("stt", {}).get("enabled") if isinstance(data.get("stt"), dict) else None
group_sessions_per_user = data.get("group_sessions_per_user")
thread_sessions_per_user = data.get("thread_sessions_per_user")
unauthorized_dm_behavior = _normalize_unauthorized_dm_behavior(
data.get("unauthorized_dm_behavior"),
"pair",
@ -392,6 +395,7 @@ class GatewayConfig:
always_log_local=data.get("always_log_local", True),
stt_enabled=_coerce_bool(stt_enabled, True),
group_sessions_per_user=_coerce_bool(group_sessions_per_user, True),
thread_sessions_per_user=_coerce_bool(thread_sessions_per_user, False),
unauthorized_dm_behavior=unauthorized_dm_behavior,
streaming=StreamingConfig.from_dict(data.get("streaming", {})),
)
@ -467,6 +471,9 @@ def load_gateway_config() -> GatewayConfig:
if "group_sessions_per_user" in yaml_cfg:
gw_data["group_sessions_per_user"] = yaml_cfg["group_sessions_per_user"]
if "thread_sessions_per_user" in yaml_cfg:
gw_data["thread_sessions_per_user"] = yaml_cfg["thread_sessions_per_user"]
streaming_cfg = yaml_cfg.get("streaming")
if isinstance(streaming_cfg, dict):
gw_data["streaming"] = streaming_cfg

View file

@ -1038,6 +1038,7 @@ class BasePlatformAdapter(ABC):
session_key = build_session_key(
event.source,
group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True),
thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False),
)
# Check if there's already an active handler for this session

View file

@ -1887,6 +1887,7 @@ class FeishuAdapter(BasePlatformAdapter):
session_key = build_session_key(
event.source,
group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True),
thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False),
)
return f"{session_key}:media:{event.message_type.value}"
@ -2163,6 +2164,7 @@ class FeishuAdapter(BasePlatformAdapter):
return build_session_key(
event.source,
group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True),
thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False),
)
@staticmethod

View file

@ -1711,6 +1711,7 @@ class TelegramAdapter(BasePlatformAdapter):
return build_session_key(
event.source,
group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True),
thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False),
)
def _enqueue_text_event(self, event: MessageEvent) -> None:
@ -1769,6 +1770,7 @@ class TelegramAdapter(BasePlatformAdapter):
session_key = build_session_key(
event.source,
group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True),
thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False),
)
media_group_id = getattr(msg, "media_group_id", None)
if media_group_id:

View file

@ -770,6 +770,7 @@ class GatewayRunner:
return build_session_key(
source,
group_sessions_per_user=getattr(config, "group_sessions_per_user", True),
thread_sessions_per_user=getattr(config, "thread_sessions_per_user", False),
)
def _resolve_turn_agent_config(self, user_message: str, model: str, runtime_kwargs: dict) -> dict:
@ -1498,6 +1499,10 @@ class GatewayRunner:
"group_sessions_per_user",
self.config.group_sessions_per_user,
)
config.extra.setdefault(
"thread_sessions_per_user",
getattr(self.config, "thread_sessions_per_user", False),
)
if platform == Platform.TELEGRAM:
from gateway.platforms.telegram import TelegramAdapter, check_telegram_requirements
@ -2662,6 +2667,23 @@ class GatewayRunner:
# tool even when they appear in the same message.
# -----------------------------------------------------------------
message_text = event.text or ""
# -----------------------------------------------------------------
# Sender attribution for shared thread sessions.
#
# When multiple users share a single thread session (the default for
# threads), prefix each message with [sender name] so the agent can
# tell participants apart. Skip for DMs (single-user by nature) and
# when per-user thread isolation is explicitly enabled.
# -----------------------------------------------------------------
_is_shared_thread = (
source.chat_type != "dm"
and source.thread_id
and not getattr(self.config, "thread_sessions_per_user", False)
)
if _is_shared_thread and source.user_name:
message_text = f"[{source.user_name}] {message_text}"
if event.media_urls:
image_paths = []
for i, path in enumerate(event.media_urls):

View file

@ -254,8 +254,22 @@ def build_session_context_prompt(
if context.source.chat_topic:
lines.append(f"**Channel Topic:** {context.source.chat_topic}")
# User identity (especially useful for WhatsApp where multiple people DM)
if context.source.user_name:
# User identity.
# In shared thread sessions (non-DM with thread_id), multiple users
# contribute to the same conversation. Don't pin a single user name
# in the system prompt — it changes per-turn and would bust the prompt
# cache. Instead, note that this is a multi-user thread; individual
# sender names are prefixed on each user message by the gateway.
_is_shared_thread = (
context.source.chat_type != "dm"
and context.source.thread_id
)
if _is_shared_thread:
lines.append(
"**Session type:** Multi-user thread — messages are prefixed "
"with [sender name]. Multiple users may participate."
)
elif context.source.user_name:
lines.append(f"**User:** {context.source.user_name}")
elif context.source.user_id:
uid = context.source.user_id
@ -427,7 +441,11 @@ class SessionEntry:
)
def build_session_key(source: SessionSource, group_sessions_per_user: bool = True) -> str:
def build_session_key(
source: SessionSource,
group_sessions_per_user: bool = True,
thread_sessions_per_user: bool = False,
) -> str:
"""Build a deterministic session key from a message source.
This is the single source of truth for session key construction.
@ -442,7 +460,11 @@ def build_session_key(source: SessionSource, group_sessions_per_user: bool = Tru
- chat_id identifies the parent group/channel.
- user_id/user_id_alt isolates participants within that parent chat when available when
``group_sessions_per_user`` is enabled.
- thread_id differentiates threads within that parent chat.
- thread_id differentiates threads within that parent chat. When
``thread_sessions_per_user`` is False (default), threads are *shared* across all
participants user_id is NOT appended, so every user in the thread
shares a single session. This is the expected UX for threaded
conversations (Telegram forum topics, Discord threads, Slack threads).
- Without participant identifiers, or when isolation is disabled, messages fall back to one
shared session per chat.
- Without identifiers, messages fall back to one session per platform/chat_type.
@ -464,7 +486,15 @@ def build_session_key(source: SessionSource, group_sessions_per_user: bool = Tru
key_parts.append(source.chat_id)
if source.thread_id:
key_parts.append(source.thread_id)
if group_sessions_per_user and participant_id:
# In threads, default to shared sessions (all participants see the same
# conversation). Per-user isolation only applies when explicitly enabled
# via thread_sessions_per_user, or when there is no thread (regular group).
isolate_user = group_sessions_per_user
if source.thread_id and not thread_sessions_per_user:
isolate_user = False
if isolate_user and participant_id:
key_parts.append(str(participant_id))
return ":".join(key_parts)
@ -552,6 +582,7 @@ class SessionStore:
return build_session_key(
source,
group_sessions_per_user=getattr(self.config, "group_sessions_per_user", True),
thread_sessions_per_user=getattr(self.config, "thread_sessions_per_user", False),
)
def _is_session_expired(self, entry: SessionEntry) -> bool:

View file

@ -109,6 +109,7 @@ class TestGatewayConfigRoundtrip:
reset_triggers=["/new"],
quick_commands={"limits": {"type": "exec", "command": "echo ok"}},
group_sessions_per_user=False,
thread_sessions_per_user=True,
)
d = config.to_dict()
restored = GatewayConfig.from_dict(d)
@ -118,6 +119,7 @@ class TestGatewayConfigRoundtrip:
assert restored.reset_triggers == ["/new"]
assert restored.quick_commands == {"limits": {"type": "exec", "command": "echo ok"}}
assert restored.group_sessions_per_user is False
assert restored.thread_sessions_per_user is True
def test_roundtrip_preserves_unauthorized_dm_behavior(self):
config = GatewayConfig(
@ -167,6 +169,30 @@ class TestLoadGatewayConfig:
assert config.group_sessions_per_user is False
def test_bridges_thread_sessions_per_user_from_config_yaml(self, tmp_path, monkeypatch):
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
config_path = hermes_home / "config.yaml"
config_path.write_text("thread_sessions_per_user: true\n", encoding="utf-8")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
config = load_gateway_config()
assert config.thread_sessions_per_user is True
def test_thread_sessions_per_user_defaults_to_false(self, tmp_path, monkeypatch):
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
config_path = hermes_home / "config.yaml"
config_path.write_text("{}\n", encoding="utf-8")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
config = load_gateway_config()
assert config.thread_sessions_per_user is False
def test_invalid_quick_commands_in_config_yaml_are_ignored(self, tmp_path, monkeypatch):
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()

View file

@ -291,6 +291,69 @@ class TestBuildSessionContextPrompt:
assert "WhatsApp" in prompt or "whatsapp" in prompt.lower()
def test_multi_user_thread_prompt(self):
"""Shared thread sessions show multi-user note instead of single user."""
config = GatewayConfig(
platforms={
Platform.TELEGRAM: PlatformConfig(enabled=True, token="fake"),
},
)
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id="-1002285219667",
chat_name="Test Group",
chat_type="group",
thread_id="17585",
user_name="Alice",
)
ctx = build_session_context(source, config)
prompt = build_session_context_prompt(ctx)
assert "Multi-user thread" in prompt
assert "[sender name]" in prompt
# Should NOT show a specific **User:** line (would bust cache)
assert "**User:** Alice" not in prompt
def test_non_thread_group_shows_user(self):
"""Regular group messages (no thread) still show the user name."""
config = GatewayConfig(
platforms={
Platform.TELEGRAM: PlatformConfig(enabled=True, token="fake"),
},
)
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id="-1002285219667",
chat_name="Test Group",
chat_type="group",
user_name="Alice",
)
ctx = build_session_context(source, config)
prompt = build_session_context_prompt(ctx)
assert "**User:** Alice" in prompt
assert "Multi-user thread" not in prompt
def test_dm_thread_shows_user_not_multi(self):
"""DM threads are single-user and should show User, not multi-user note."""
config = GatewayConfig(
platforms={
Platform.TELEGRAM: PlatformConfig(enabled=True, token="fake"),
},
)
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id="99",
chat_type="dm",
thread_id="topic-1",
user_name="Alice",
)
ctx = build_session_context(source, config)
prompt = build_session_context_prompt(ctx)
assert "**User:** Alice" in prompt
assert "Multi-user thread" not in prompt
class TestSessionStoreRewriteTranscript:
"""Regression: /retry and /undo must persist truncated history to disk."""
@ -636,7 +699,28 @@ class TestWhatsAppDMSessionKeyConsistency:
key = build_session_key(source)
assert key == "agent:main:telegram:group:-1002285219667:17585"
def test_group_thread_sessions_are_isolated_per_user(self):
def test_group_thread_sessions_are_shared_by_default(self):
"""Threads default to shared sessions — user_id is NOT appended."""
alice = SessionSource(
platform=Platform.TELEGRAM,
chat_id="-1002285219667",
chat_type="group",
thread_id="17585",
user_id="alice",
)
bob = SessionSource(
platform=Platform.TELEGRAM,
chat_id="-1002285219667",
chat_type="group",
thread_id="17585",
user_id="bob",
)
assert build_session_key(alice) == "agent:main:telegram:group:-1002285219667:17585"
assert build_session_key(bob) == "agent:main:telegram:group:-1002285219667:17585"
assert build_session_key(alice) == build_session_key(bob)
def test_group_thread_sessions_can_be_isolated_per_user(self):
"""thread_sessions_per_user=True restores per-user isolation in threads."""
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id="-1002285219667",
@ -644,9 +728,60 @@ class TestWhatsAppDMSessionKeyConsistency:
thread_id="17585",
user_id="42",
)
key = build_session_key(source)
key = build_session_key(source, thread_sessions_per_user=True)
assert key == "agent:main:telegram:group:-1002285219667:17585:42"
def test_non_thread_group_sessions_still_isolated_per_user(self):
"""Regular group messages (no thread_id) remain per-user by default."""
alice = SessionSource(
platform=Platform.TELEGRAM,
chat_id="-1002285219667",
chat_type="group",
user_id="alice",
)
bob = SessionSource(
platform=Platform.TELEGRAM,
chat_id="-1002285219667",
chat_type="group",
user_id="bob",
)
assert build_session_key(alice) == "agent:main:telegram:group:-1002285219667:alice"
assert build_session_key(bob) == "agent:main:telegram:group:-1002285219667:bob"
assert build_session_key(alice) != build_session_key(bob)
def test_discord_thread_sessions_shared_by_default(self):
"""Discord threads are shared across participants by default."""
alice = SessionSource(
platform=Platform.DISCORD,
chat_id="guild-123",
chat_type="thread",
thread_id="thread-456",
user_id="alice",
)
bob = SessionSource(
platform=Platform.DISCORD,
chat_id="guild-123",
chat_type="thread",
thread_id="thread-456",
user_id="bob",
)
assert build_session_key(alice) == build_session_key(bob)
assert "alice" not in build_session_key(alice)
assert "bob" not in build_session_key(bob)
def test_dm_thread_sessions_not_affected(self):
"""DM threads use their own keying logic and are not affected."""
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id="99",
chat_type="dm",
thread_id="topic-1",
user_id="42",
)
key = build_session_key(source)
# DM logic: chat_id + thread_id, user_id never included
assert key == "agent:main:telegram:dm:99:topic-1"
class TestSessionStoreEntriesAttribute:
"""Regression: /reset must access _entries, not _sessions."""