diff --git a/gateway/config.py b/gateway/config.py index 6756755c3a9..ab03bc1e0c1 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -766,10 +766,18 @@ def load_gateway_config() -> GatewayConfig: bridged["dm_policy"] = platform_cfg["dm_policy"] if "allow_from" in platform_cfg: bridged["allow_from"] = platform_cfg["allow_from"] + if "allow_admin_from" in platform_cfg: + bridged["allow_admin_from"] = platform_cfg["allow_admin_from"] + if "user_allowed_commands" in platform_cfg: + bridged["user_allowed_commands"] = platform_cfg["user_allowed_commands"] if "group_policy" in platform_cfg: bridged["group_policy"] = platform_cfg["group_policy"] if "group_allow_from" in platform_cfg: bridged["group_allow_from"] = platform_cfg["group_allow_from"] + if "group_allow_admin_from" in platform_cfg: + bridged["group_allow_admin_from"] = platform_cfg["group_allow_admin_from"] + if "group_user_allowed_commands" in platform_cfg: + bridged["group_user_allowed_commands"] = platform_cfg["group_user_allowed_commands"] if plat in (Platform.DISCORD, Platform.SLACK) and "channel_skill_bindings" in platform_cfg: bridged["channel_skill_bindings"] = platform_cfg["channel_skill_bindings"] if "channel_prompts" in platform_cfg: diff --git a/gateway/run.py b/gateway/run.py index ba40995b859..c95c09da6a6 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -5583,6 +5583,17 @@ class GatewayRunner: _evt_cmd = event.get_command() _cmd_def_inner = _resolve_cmd_inner(_evt_cmd) if _evt_cmd else None + # Slash command access control on the running-agent fast-path. + # Mirrors the cold-path gate further below so non-admin users + # can't bypass gating just because an agent happens to be busy. + # /status above is intentionally pre-gate so users always see + # session state. /help and /whoami fall under the always-allowed + # floor inside _check_slash_access. + if _evt_cmd and _cmd_def_inner is not None: + _denied = self._check_slash_access(source, _cmd_def_inner.name) + if _denied is not None: + return _denied + if _cmd_def_inner and _cmd_def_inner.name == "restart": return await self._handle_restart_command(event) @@ -5901,6 +5912,17 @@ class GatewayRunner: _cmd_def = _resolve_cmd(command) if command else None canonical = _cmd_def.name if _cmd_def else command + # Per-platform slash command access control. Only kicks in when the + # operator has set ``allow_admin_from`` for the source's scope (DM + # vs group). When unset → backward-compat: every allowed user can + # run every command. When set → non-admins can run only commands in + # ``user_allowed_commands`` (plus the always-allowed floor: /help, + # /whoami). Plain chat is unaffected — only slash commands gate. + if command and canonical and is_gateway_known_command(canonical): + _denied = self._check_slash_access(source, canonical) + if _denied is not None: + return _denied + # Fire the ``command:`` hook for any recognized slash # command — built-in OR plugin-registered. Handlers can return a # dict with ``{"decision": "deny" | "handled" | "rewrite", ...}`` @@ -5984,6 +6006,9 @@ class GatewayRunner: if canonical == "profile": return await self._handle_profile_command(event) + if canonical == "whoami": + return await self._handle_whoami_command(event) + if canonical == "status": return await self._handle_status_command(event) @@ -7827,6 +7852,101 @@ class GatewayRunner: return "\n".join(lines) + def _check_slash_access( + self, source: SessionSource, canonical_cmd: str + ) -> Optional[str]: + """Return a denial message if ``source`` cannot run ``canonical_cmd``, + else None. Used by both the cold and running-agent dispatch paths + in ``_handle_message`` so admin/user gating can't be bypassed by + an in-flight agent. + + Backward-compat semantics live in + :func:`gateway.slash_access.policy_for_source` — when the operator + hasn't set ``allow_admin_from`` for the scope, the policy returns + ``enabled=False`` and this method always returns None. + """ + from gateway.slash_access import policy_for_source as _policy_for_source + + if not canonical_cmd: + return None + policy = _policy_for_source(self.config, source) + if not policy.enabled or policy.can_run(source.user_id, canonical_cmd): + return None + logger.info( + "Slash command /%s denied for %s:%s (not admin, not in user_allowed_commands)", + canonical_cmd, + source.platform.value if source.platform else "?", + source.user_id, + ) + allowed_preview = sorted(policy.user_allowed_commands) + if allowed_preview: + suffix = ( + "You can run: " + + ", ".join(f"/{c}" for c in allowed_preview[:12]) + + ("…" if len(allowed_preview) > 12 else "") + + ". Use /whoami for the full list." + ) + else: + suffix = ( + "No slash commands are enabled for non-admins on this " + "platform. Ask an admin to add you to allow_admin_from " + "or to set user_allowed_commands." + ) + return f"⛔ /{canonical_cmd} is admin-only here. {suffix}" + + + async def _handle_whoami_command(self, event: MessageEvent) -> str: + """Handle /whoami — show the user's slash command access on this scope. + + Always works (it's in the always-allowed floor of slash_access). + Reports: platform, scope (DM vs group), the user's tier + (admin / user / unrestricted), and the slash commands they can + actually run on this scope. + """ + from gateway.slash_access import policy_for_source as _policy_for_source + + source = event.source + policy = _policy_for_source(self.config, source) + platform = source.platform.value if source and source.platform else "?" + chat_type = (source.chat_type if source else "") or "dm" + scope = "DM" if chat_type.lower() in ("dm", "direct", "private", "") else "group/channel" + user_id = (source.user_id if source else None) or "?" + + if not policy.enabled: + return ( + f"**You** — {platform} ({scope})\n" + f"User ID: `{user_id}`\n" + f"Tier: unrestricted (no admin list configured for this scope)\n" + f"Slash commands: all available" + ) + + if policy.is_admin(user_id): + return ( + f"**You** — {platform} ({scope})\n" + f"User ID: `{user_id}`\n" + f"Tier: **admin**\n" + f"Slash commands: all available" + ) + + # Non-admin user. Show what's actually reachable. + floor = ["help", "whoami"] # mirrors slash_access._ALWAYS_ALLOWED_FOR_USERS + configured = sorted(policy.user_allowed_commands) + # Combine + dedupe, preserve order: floor first, then operator additions. + seen: set[str] = set() + runnable: list[str] = [] + for c in floor + configured: + if c not in seen: + seen.add(c) + runnable.append(c) + runnable_str = ", ".join(f"/{c}" for c in runnable) if runnable else "(none)" + return ( + f"**You** — {platform} ({scope})\n" + f"User ID: `{user_id}`\n" + f"Tier: user\n" + f"Slash commands you can run: {runnable_str}" + ) + + async def _handle_kanban_command(self, event: MessageEvent) -> str: """Handle /kanban — delegate to the shared kanban CLI. diff --git a/gateway/slash_access.py b/gateway/slash_access.py new file mode 100644 index 00000000000..e4a398dc14a --- /dev/null +++ b/gateway/slash_access.py @@ -0,0 +1,229 @@ +"""Per-platform slash command access control. + +This module sits beside the existing per-platform allowlist (``allow_from``) +and adds a second axis: of the users who are *allowed to talk to the +gateway*, which ones can run *which slash commands*. + +Two lists per platform scope (DM vs group, mirroring ``allow_from`` vs +``group_allow_from``): + + - ``allow_admin_from`` — user IDs that get every registered slash + command (built-in + plugin-registered). + - ``user_allowed_commands`` — slash command names non-admin users may + run. Empty / unset → non-admins get no + slash commands. + +Backward compatibility: + + If ``allow_admin_from`` is not set for a scope, slash command gating + is disabled entirely for that scope. Every allowed user can run every + slash command, exactly like before. This means existing installs are + unaffected until an operator opts in by listing at least one admin. + +The gate is applied at the slash command dispatch site in +``gateway/run.py`` so it covers BOTH built-in and plugin-registered +commands via the live registry. Gating slash commands does not affect +plain chat — non-admin users can still talk to the agent normally, +they just can't trigger commands outside ``user_allowed_commands``. + +Authored as a slimmed-down salvage of PR #4443's permission tiers +(co-authored by @ReqX). The full tier system, audit log, usage +tracking, rate limiting, and tool filtering from that PR are not +included here — only the slash-command access split. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, FrozenSet, Iterable, Optional, Tuple + + +# Slash commands that MUST stay reachable for any allowed user, even when +# slash gating is enabled and the user has no commands listed. Without this +# carve-out, a non-admin user has no way to discover what they can or +# can't do (``/help``, ``/whoami``) and no way to see what state the agent +# is in (``/status``). These mirror the smallest set of read-only commands +# we'd hand to a guest. Operators can still narrow this further by writing +# their own ``user_allowed_commands`` (this set is only the implicit +# fallback floor — anything in ``user_allowed_commands`` overrides it +# additively, never restrictively). +_ALWAYS_ALLOWED_FOR_USERS: FrozenSet[str] = frozenset({ + "help", + "whoami", +}) + + +@dataclass(frozen=True) +class SlashAccessPolicy: + """Resolved access policy for a single (platform, scope) pair. + + ``scope`` is ``"dm"`` for direct messages and ``"group"`` for groups, + channels, threads, and any other multi-user context. The mapping from + SessionSource.chat_type → scope happens in ``policy_for_source``. + """ + + enabled: bool # gating active for this scope? + admin_user_ids: FrozenSet[str] + user_allowed_commands: FrozenSet[str] + + def is_admin(self, user_id: Optional[str]) -> bool: + if not self.enabled: + # Gating disabled → treat every allowed user as admin so + # downstream code can keep using ``is_admin`` / ``can_run`` + # uniformly. + return True + if not user_id: + return False + return str(user_id) in self.admin_user_ids + + def can_run(self, user_id: Optional[str], canonical_cmd: str) -> bool: + if not self.enabled: + return True + if self.is_admin(user_id): + return True + if not canonical_cmd: + return False + if canonical_cmd in _ALWAYS_ALLOWED_FOR_USERS: + return True + return canonical_cmd in self.user_allowed_commands + + +_DM_CHAT_TYPES = frozenset({"dm", "direct", "private", ""}) + + +def _coerce_id_list(raw: Any) -> FrozenSet[str]: + """Normalize a YAML-loaded admin/user list into a frozenset of strings. + + Accepts ``None``, list, tuple, or comma-separated string. Stringifies + each entry and strips whitespace; empty entries are dropped. + """ + if raw is None: + return frozenset() + if isinstance(raw, (list, tuple, set, frozenset)): + items: Iterable[Any] = raw + elif isinstance(raw, str): + items = (s for s in raw.split(",") if s.strip()) + else: + # single scalar (int user id, etc.) + items = (raw,) + out: list[str] = [] + for it in items: + s = str(it).strip() + if s: + out.append(s) + return frozenset(out) + + +def _coerce_command_list(raw: Any) -> FrozenSet[str]: + """Normalize a slash command allowlist. + + Strips leading slashes so YAML can read either ``["help", "status"]`` + or ``["/help", "/status"]``. Lowercase canonicalization matches how + ``resolve_command()`` stores names. + """ + if raw is None: + return frozenset() + if isinstance(raw, (list, tuple, set, frozenset)): + items: Iterable[Any] = raw + elif isinstance(raw, str): + items = (s for s in raw.split(",") if s.strip()) + else: + items = (raw,) + out: list[str] = [] + for it in items: + s = str(it).strip().lstrip("/").lower() + if s: + out.append(s) + return frozenset(out) + + +def _scope_for_chat_type(chat_type: Optional[str]) -> str: + if chat_type and chat_type.lower() in _DM_CHAT_TYPES: + return "dm" + return "group" + + +def _platform_extra(platform_config: Any) -> dict: + """Return the ``extra`` dict from a PlatformConfig-like object. + + Defensively handles None and non-PlatformConfig shapes so calling + code can stay simple. + """ + if platform_config is None: + return {} + extra = getattr(platform_config, "extra", None) + if isinstance(extra, dict): + return extra + if isinstance(platform_config, dict): + # Some test harnesses pass dicts directly. + return platform_config + return {} + + +def _keys_for_scope(scope: str) -> Tuple[str, str]: + """Return (admin_key, user_cmd_key) names for a scope.""" + if scope == "group": + return ("group_allow_admin_from", "group_user_allowed_commands") + return ("allow_admin_from", "user_allowed_commands") + + +def policy_from_extra(extra: dict, scope: str) -> SlashAccessPolicy: + """Build a policy from a platform's ``extra`` dict for one scope. + + DM scope falls back to group scope keys ONLY for ``user_allowed_commands`` + when the DM scope didn't specify its own. This keeps the common case + (operator wants the same command set DM and group) ergonomic without + forcing duplication. Admin lists are NOT cross-scope: an admin in + DMs is not implicitly an admin in a group. + """ + admin_key, cmd_key = _keys_for_scope(scope) + admin_ids = _coerce_id_list(extra.get(admin_key)) + cmds = _coerce_command_list(extra.get(cmd_key)) + + if scope == "dm" and not cmds: + # DM didn't specify — let group's user_allowed_commands fall through + # so operators only need to list it once if it's the same. + cmds = _coerce_command_list(extra.get("group_user_allowed_commands")) + + enabled = bool(admin_ids) + return SlashAccessPolicy( + enabled=enabled, + admin_user_ids=admin_ids, + user_allowed_commands=cmds, + ) + + +def policy_for_source(gateway_config: Any, source: Any) -> SlashAccessPolicy: + """Resolve the access policy for a SessionSource. + + Returns a "disabled" policy (gating off, allow everything) when: + - gateway_config is None + - the platform has no PlatformConfig + - the platform's PlatformConfig has no admin list set for the scope + + Callers should treat the returned policy as authoritative for slash + command gating only. It does not gate plain chat messages. + """ + if gateway_config is None or source is None: + return SlashAccessPolicy( + enabled=False, + admin_user_ids=frozenset(), + user_allowed_commands=frozenset(), + ) + platforms = getattr(gateway_config, "platforms", None) + platform_config = None + if platforms is not None: + try: + platform_config = platforms.get(source.platform) + except Exception: + platform_config = None + extra = _platform_extra(platform_config) + scope = _scope_for_chat_type(getattr(source, "chat_type", None)) + return policy_from_extra(extra, scope) + + +__all__ = [ + "SlashAccessPolicy", + "policy_from_extra", + "policy_for_source", +] diff --git a/hermes_cli/commands.py b/hermes_cli/commands.py index de41bcfae7e..5889e3d222a 100644 --- a/hermes_cli/commands.py +++ b/hermes_cli/commands.py @@ -103,6 +103,7 @@ COMMAND_REGISTRY: list[CommandDef] = [ CommandDef("goal", "Set a standing goal Hermes works on across turns until achieved", "Session", args_hint="[text | pause | resume | clear | status]"), CommandDef("status", "Show session info", "Session"), + CommandDef("whoami", "Show your slash command access (admin / user)", "Info"), CommandDef("profile", "Show active profile name and home directory", "Info"), CommandDef("sethome", "Set this chat as the home channel", "Session", gateway_only=True, aliases=("set-home",)), diff --git a/scripts/release.py b/scripts/release.py index 6d7e463903a..2187f1046ef 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -63,6 +63,7 @@ AUTHOR_MAP = { "mason@growagainorchids.com": "masonjames", "ytchen0719@gmail.com": "liquidchen", "am@studio1.tailb672fe.ts.net": "subtract0", + "mike@grossmann.at": "ReqX", "axmaiqiu@gmail.com": "qWaitCrypto", "44045911+kidonng@users.noreply.github.com": "kidonng", "daniellsmarta@gmail.com": "DanielLSM", diff --git a/tests/gateway/test_slash_access.py b/tests/gateway/test_slash_access.py new file mode 100644 index 00000000000..5e21ac8b610 --- /dev/null +++ b/tests/gateway/test_slash_access.py @@ -0,0 +1,289 @@ +"""Unit tests for gateway.slash_access — per-platform slash command access control. + +Tests the pure policy resolver (no gateway plumbing). Integration tests that +exercise the dispatch site live in test_slash_access_dispatch.py. +""" +from __future__ import annotations + +from gateway.config import GatewayConfig, Platform, PlatformConfig +from gateway.session import SessionSource +from gateway.slash_access import ( + SlashAccessPolicy, + policy_for_source, + policy_from_extra, +) + + +# --------------------------------------------------------------------------- +# policy_from_extra — input normalization + scope resolution +# --------------------------------------------------------------------------- + + +class TestPolicyFromExtra: + def test_empty_extra_is_disabled(self): + p = policy_from_extra({}, "dm") + assert p.enabled is False + assert p.admin_user_ids == frozenset() + assert p.user_allowed_commands == frozenset() + + def test_disabled_policy_treats_anyone_as_admin(self): + # When gating is off, downstream code uses is_admin/can_run uniformly. + # Both must short-circuit to True so existing behavior is preserved. + p = policy_from_extra({}, "dm") + assert p.is_admin("anyone") is True + assert p.can_run("anyone", "stop") is True + + def test_dm_admin_list_only(self): + p = policy_from_extra({"allow_admin_from": ["111", "222"]}, "dm") + assert p.enabled is True + assert p.admin_user_ids == frozenset({"111", "222"}) + assert p.user_allowed_commands == frozenset() + + def test_admin_runs_anything(self): + p = policy_from_extra( + {"allow_admin_from": [111], "user_allowed_commands": ["help"]}, + "dm", + ) + assert p.is_admin("111") is True + assert p.can_run("111", "stop") is True + assert p.can_run("111", "kanban") is True + + def test_non_admin_runs_only_listed_commands(self): + p = policy_from_extra( + { + "allow_admin_from": ["111"], + "user_allowed_commands": ["status", "model"], + }, + "dm", + ) + assert p.is_admin("999") is False + assert p.can_run("999", "status") is True + assert p.can_run("999", "model") is True + assert p.can_run("999", "stop") is False + assert p.can_run("999", "kanban") is False + + def test_always_allowed_floor_for_non_admin(self): + # /help and /whoami always reachable so users can see what they can do. + p = policy_from_extra( + {"allow_admin_from": ["111"], "user_allowed_commands": []}, + "dm", + ) + assert p.can_run("999", "help") is True + assert p.can_run("999", "whoami") is True + assert p.can_run("999", "stop") is False + + def test_unknown_user_id_blocked(self): + # Empty/None user_id → no admin status, no command access (except floor). + p = policy_from_extra( + {"allow_admin_from": ["111"], "user_allowed_commands": ["status"]}, + "dm", + ) + assert p.is_admin(None) is False + assert p.can_run(None, "status") is True # listed command works + assert p.can_run(None, "stop") is False + assert p.can_run("", "stop") is False + + def test_id_coercion_ints_become_strings(self): + # YAML often loads numeric IDs as ints; we stringify on ingest. + p = policy_from_extra({"allow_admin_from": [12345, 67890]}, "dm") + assert p.admin_user_ids == frozenset({"12345", "67890"}) + assert p.is_admin("12345") is True + assert p.is_admin(12345) is True # is_admin also stringifies + + def test_id_coercion_csv_string(self): + p = policy_from_extra({"allow_admin_from": "111, 222 ,333"}, "dm") + assert p.admin_user_ids == frozenset({"111", "222", "333"}) + + def test_command_coercion_strips_leading_slash_and_lowercases(self): + p = policy_from_extra( + { + "allow_admin_from": ["111"], + "user_allowed_commands": ["/Status", "MODEL", "/help"], + }, + "dm", + ) + assert p.user_allowed_commands == frozenset({"status", "model", "help"}) + + def test_command_coercion_csv_string(self): + p = policy_from_extra( + { + "allow_admin_from": ["111"], + "user_allowed_commands": "status, model , /help", + }, + "dm", + ) + assert p.user_allowed_commands == frozenset({"status", "model", "help"}) + + def test_group_scope_uses_group_keys(self): + extra = { + "allow_admin_from": ["111"], # DM admins + "user_allowed_commands": ["status"], # DM commands + "group_allow_admin_from": ["222"], + "group_user_allowed_commands": ["help"], + } + dm = policy_from_extra(extra, "dm") + gp = policy_from_extra(extra, "group") + assert dm.admin_user_ids == frozenset({"111"}) + assert gp.admin_user_ids == frozenset({"222"}) + assert dm.user_allowed_commands == frozenset({"status"}) + # group's user_allowed_commands does not leak into DM's allowed list + # except via the explicit fallback rule (only when DM list is unset). + assert "help" in gp.user_allowed_commands + + def test_dm_falls_back_to_group_user_commands_when_dm_unset(self): + # Common case: operator wants the same command set DM and group; + # they should only have to list it once on the group keys. + extra = { + "allow_admin_from": ["111"], + "group_user_allowed_commands": ["status", "model"], + } + dm = policy_from_extra(extra, "dm") + assert dm.user_allowed_commands == frozenset({"status", "model"}) + + def test_dm_admin_does_not_imply_group_admin(self): + # Admin lists are scope-specific. DM admin must not auto-promote in groups. + extra = {"allow_admin_from": ["111"]} + dm = policy_from_extra(extra, "dm") + gp = policy_from_extra(extra, "group") + assert dm.is_admin("111") is True + # Group has no admin list set → gating disabled in groups → "111" + # gets unrestricted access, but that's the backward-compat fallback, + # not implicit admin promotion. The distinction matters when the + # group DOES have an admin list set: + extra2 = { + "allow_admin_from": ["111"], + "group_allow_admin_from": ["222"], + } + gp2 = policy_from_extra(extra2, "group") + assert gp2.is_admin("111") is False + assert gp2.is_admin("222") is True + + +# --------------------------------------------------------------------------- +# policy_for_source — wires GatewayConfig + SessionSource together +# --------------------------------------------------------------------------- + + +class TestPolicyForSource: + def test_no_config_returns_disabled(self): + p = policy_for_source(None, None) + assert p.enabled is False + assert p.is_admin("anyone") is True + + def test_no_platform_config_returns_disabled(self): + cfg = GatewayConfig(platforms={}) + src = SessionSource( + platform=Platform.DISCORD, chat_id="42", chat_type="dm", user_id="7" + ) + p = policy_for_source(cfg, src) + assert p.enabled is False + + def test_dm_chat_type_resolves_to_dm_scope(self): + cfg = GatewayConfig( + platforms={ + Platform.DISCORD: PlatformConfig( + enabled=True, + extra={ + "allow_admin_from": ["111"], + "user_allowed_commands": ["status"], + "group_allow_admin_from": ["222"], + "group_user_allowed_commands": ["help"], + }, + ) + } + ) + dm_src = SessionSource( + platform=Platform.DISCORD, chat_id="A", chat_type="dm", user_id="111" + ) + p = policy_for_source(cfg, dm_src) + assert p.is_admin("111") is True + assert p.can_run("999", "status") is True + assert p.can_run("999", "help") is True # always-allowed floor + assert p.can_run("999", "kanban") is False + + def test_group_chat_type_resolves_to_group_scope(self): + cfg = GatewayConfig( + platforms={ + Platform.DISCORD: PlatformConfig( + enabled=True, + extra={ + "allow_admin_from": ["111"], + "user_allowed_commands": ["status"], + "group_allow_admin_from": ["222"], + "group_user_allowed_commands": ["help"], + }, + ) + } + ) + grp_src = SessionSource( + platform=Platform.DISCORD, chat_id="G", chat_type="group", user_id="222" + ) + p = policy_for_source(cfg, grp_src) + assert p.is_admin("222") is True + assert p.is_admin("111") is False # DM admin, not group admin + # In group scope, the only listed user command is "help"; "status" + # is not in the group list and should be denied for non-admins. + assert p.can_run("999", "help") is True + assert p.can_run("999", "status") is False + + def test_channel_thread_chat_types_treated_as_group_scope(self): + # Discord channels and threads are group-scoped, not DM-scoped. + cfg = GatewayConfig( + platforms={ + Platform.DISCORD: PlatformConfig( + enabled=True, + extra={ + "allow_admin_from": ["111"], + "group_allow_admin_from": ["222"], + }, + ) + } + ) + for ct in ("group", "channel", "thread", "supergroup"): + src = SessionSource( + platform=Platform.DISCORD, chat_id="X", chat_type=ct, user_id="222" + ) + p = policy_for_source(cfg, src) + assert p.is_admin("222") is True, f"chat_type={ct} should map to group scope" + assert p.is_admin("111") is False, f"chat_type={ct} should not see DM admins" + + def test_no_admin_list_for_dm_means_unrestricted_in_dm(self): + # Group has admin list, DM does not → DM gating disabled, group active. + cfg = GatewayConfig( + platforms={ + Platform.DISCORD: PlatformConfig( + enabled=True, + extra={"group_allow_admin_from": ["222"]}, + ) + } + ) + dm_src = SessionSource( + platform=Platform.DISCORD, chat_id="A", chat_type="dm", user_id="999" + ) + grp_src = SessionSource( + platform=Platform.DISCORD, chat_id="G", chat_type="group", user_id="999" + ) + dm_p = policy_for_source(cfg, dm_src) + grp_p = policy_for_source(cfg, grp_src) + assert dm_p.enabled is False + assert dm_p.can_run("999", "stop") is True # backward compat + assert grp_p.enabled is True + assert grp_p.can_run("999", "stop") is False # gated + + def test_per_platform_isolation(self): + # Discord has gating, Telegram doesn't → Telegram is unaffected. + cfg = GatewayConfig( + platforms={ + Platform.DISCORD: PlatformConfig( + enabled=True, + extra={"allow_admin_from": ["111"]}, + ), + Platform.TELEGRAM: PlatformConfig(enabled=True, extra={}), + } + ) + tg_src = SessionSource( + platform=Platform.TELEGRAM, chat_id="T", chat_type="dm", user_id="999" + ) + p = policy_for_source(cfg, tg_src) + assert p.enabled is False + assert p.can_run("999", "stop") is True diff --git a/tests/gateway/test_slash_access_dispatch.py b/tests/gateway/test_slash_access_dispatch.py new file mode 100644 index 00000000000..1e26c93e0eb --- /dev/null +++ b/tests/gateway/test_slash_access_dispatch.py @@ -0,0 +1,558 @@ +"""Integration tests for slash command access control gating in gateway/run.py. + +Drives the real ``GatewayRunner._handle_message`` path with a stub session +store so we exercise the actual gate inserted at the dispatch site (not a +re-implementation in the test). Uses the same ``object.__new__`` runner +construction pattern as test_status_command.py. + +Coverage targets: + - Backward compat: no ``allow_admin_from`` set → behaves exactly as before + (no denial messages, dispatch reaches the real handler). + - Admin path: user in ``allow_admin_from`` runs anything. + - User path: user not in admin list, but command in + ``user_allowed_commands`` → allowed. + - User denied: command not in either list → returns the ⛔ denial. + - Always-allowed floor: /help and /whoami reachable for non-admins + even with empty user_allowed_commands. + - DM vs group scope isolation. +""" +from __future__ import annotations + +from datetime import datetime +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from gateway.config import GatewayConfig, Platform, PlatformConfig +from gateway.platforms.base import MessageEvent +from gateway.session import SessionEntry, SessionSource, build_session_key + + +def _make_source( + *, + platform: Platform = Platform.DISCORD, + user_id: str = "user1", + chat_type: str = "dm", + chat_id: str = "c1", +) -> SessionSource: + return SessionSource( + platform=platform, + user_id=user_id, + chat_id=chat_id, + user_name=f"name-{user_id}", + chat_type=chat_type, + ) + + +def _make_event(text: str, source: SessionSource) -> MessageEvent: + return MessageEvent(text=text, source=source, message_id="m1") + + +def _make_runner(*, platform_extra: dict | None = None, + platform: Platform = Platform.DISCORD): + from gateway.run import GatewayRunner + + runner = object.__new__(GatewayRunner) + runner.config = GatewayConfig( + platforms={ + platform: PlatformConfig( + enabled=True, + token="***", + extra=platform_extra or {}, + ) + } + ) + adapter = MagicMock() + adapter.send = AsyncMock() + runner.adapters = {platform: adapter} + runner._voice_mode = {} + runner.hooks = SimpleNamespace( + emit=AsyncMock(), + emit_collect=AsyncMock(return_value=[]), + loaded_hooks=False, + ) + runner.session_store = MagicMock() + session_entry = SessionEntry( + session_key="agent:main:discord:dm:c1", + session_id="sess-1", + created_at=datetime.now(), + updated_at=datetime.now(), + platform=platform, + chat_type="dm", + total_tokens=0, + ) + runner.session_store.get_or_create_session.return_value = session_entry + runner.session_store.load_transcript.return_value = [] + runner.session_store.has_any_sessions.return_value = True + runner.session_store.append_to_transcript = MagicMock() + runner.session_store.rewrite_transcript = MagicMock() + runner.session_store.update_session = MagicMock() + runner._running_agents = {} + runner._running_agents_ts = {} + runner._session_run_generation = {} + runner._pending_messages = {} + runner._pending_approvals = {} + runner._session_sources = {} + runner._session_db = MagicMock() + runner._session_db.get_session_title.return_value = None + runner._session_db.get_session.return_value = None + runner._reasoning_config = None + runner._provider_routing = {} + runner._fallback_model = None + runner._show_reasoning = False + runner._is_user_authorized = lambda _source: True + runner._set_session_env = lambda _context: None + runner._should_send_voice_reply = lambda *_args, **_kwargs: False + runner._send_voice_reply = AsyncMock() + runner._capture_gateway_honcho_if_configured = lambda *args, **kwargs: None + runner._emit_gateway_run_progress = AsyncMock() + return runner + + +# --------------------------------------------------------------------------- +# /whoami response shape — proves the handler is reachable AND uses the +# resolver. We use /whoami because it's deterministic and short-circuits +# before any session/agent setup. +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_whoami_unrestricted_when_no_admin_list(): + runner = _make_runner(platform_extra={}) # no admin list + result = await runner._handle_message(_make_event("/whoami", _make_source(user_id="999"))) + assert "Tier: unrestricted" in result + assert "no admin list configured" in result + + +@pytest.mark.asyncio +async def test_whoami_admin_user(): + runner = _make_runner(platform_extra={"allow_admin_from": ["111"]}) + result = await runner._handle_message(_make_event("/whoami", _make_source(user_id="111"))) + assert "**admin**" in result + + +@pytest.mark.asyncio +async def test_whoami_non_admin_lists_runnable_commands(): + runner = _make_runner( + platform_extra={ + "allow_admin_from": ["111"], + "user_allowed_commands": ["status", "model"], + } + ) + result = await runner._handle_message(_make_event("/whoami", _make_source(user_id="999"))) + assert "Tier: user" in result + assert "/help" in result # always-allowed floor + assert "/whoami" in result # always-allowed floor + assert "/status" in result + assert "/model" in result + + +# --------------------------------------------------------------------------- +# Gate denial — admin-only command attempted by non-admin +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_non_admin_denied_for_unlisted_command(): + runner = _make_runner( + platform_extra={ + "allow_admin_from": ["111"], + "user_allowed_commands": ["status"], + } + ) + # /stop is NOT in user_allowed_commands and not in the always-allowed floor. + result = await runner._handle_message(_make_event("/stop", _make_source(user_id="999"))) + assert result is not None + assert "⛔" in result + assert "/stop is admin-only here" in result + assert "/status" in result # denial preview shows what they CAN run + + +@pytest.mark.asyncio +async def test_non_admin_with_empty_user_commands_gets_floor_only(): + runner = _make_runner( + platform_extra={ + "allow_admin_from": ["111"], + "user_allowed_commands": [], # explicitly empty + } + ) + # /stop denied + result = await runner._handle_message(_make_event("/stop", _make_source(user_id="999"))) + assert "⛔" in result + assert "No slash commands are enabled" in result + # /whoami still works (always-allowed floor) + whoami_result = await runner._handle_message(_make_event("/whoami", _make_source(user_id="999"))) + assert "Tier: user" in whoami_result + + +# --------------------------------------------------------------------------- +# Gate ALLOW — admin and listed user +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_admin_runs_unlisted_command(): + runner = _make_runner( + platform_extra={ + "allow_admin_from": ["111"], + "user_allowed_commands": [], # users can run nothing + } + ) + # Admin runs /whoami (proxy for "any command works"); the gate must NOT + # return the ⛔ denial. The /whoami handler is deterministic and doesn't + # need a real agent, so we can assert against its content. + result = await runner._handle_message(_make_event("/whoami", _make_source(user_id="111"))) + assert "⛔" not in result + assert "**admin**" in result + + +@pytest.mark.asyncio +async def test_user_runs_listed_command(): + runner = _make_runner( + platform_extra={ + "allow_admin_from": ["111"], + "user_allowed_commands": ["whoami"], # explicit + } + ) + result = await runner._handle_message(_make_event("/whoami", _make_source(user_id="999"))) + assert "⛔" not in result + assert "Tier: user" in result + + +# --------------------------------------------------------------------------- +# Backward compatibility — no admin list set means no gating at all +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_backward_compat_no_admin_list_means_no_gate(): + runner = _make_runner(platform_extra={}) # nothing configured + # Random non-listed user runs /whoami; should return unrestricted profile, + # never a denial. + result = await runner._handle_message(_make_event("/whoami", _make_source(user_id="anyone"))) + assert "⛔" not in result + assert "Tier: unrestricted" in result + + +# --------------------------------------------------------------------------- +# Scope isolation — DM vs group +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_dm_admin_is_not_group_admin(): + runner = _make_runner( + platform_extra={ + "allow_admin_from": ["111"], + "group_allow_admin_from": ["222"], + "group_user_allowed_commands": [], + } + ) + # User 111 is DM admin. In group context they're a non-admin with no + # listed commands → /stop denied. + result = await runner._handle_message( + _make_event("/stop", _make_source(user_id="111", chat_type="group")) + ) + assert "⛔" in result + + +@pytest.mark.asyncio +async def test_group_only_gating_leaves_dm_unrestricted(): + runner = _make_runner( + platform_extra={ + # Only group has an admin list → DM scope stays in backward-compat mode + "group_allow_admin_from": ["222"], + } + ) + result = await runner._handle_message(_make_event("/whoami", _make_source(user_id="anyone", chat_type="dm"))) + assert "Tier: unrestricted" in result + + +# --------------------------------------------------------------------------- +# Plugin-registered slash commands are gated through the same path +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_plugin_registered_command_is_gated(monkeypatch): + """The gate must recognize plugin-registered slash commands, not just + built-in COMMAND_REGISTRY entries. We verify by stubbing + is_gateway_known_command and resolve_command so a fictitious /myplugin + command is treated as a known plugin command. + """ + runner = _make_runner( + platform_extra={ + "allow_admin_from": ["111"], + "user_allowed_commands": [], + } + ) + + from hermes_cli import commands as cmd_mod + + real_resolve = cmd_mod.resolve_command + real_is_known = cmd_mod.is_gateway_known_command + + def fake_resolve(name): + if name == "myplugin": + # Return a CommandDef-like duck so canonical resolution succeeds + return SimpleNamespace(name="myplugin") + return real_resolve(name) + + def fake_is_known(name): + if name == "myplugin": + return True + return real_is_known(name) + + monkeypatch.setattr(cmd_mod, "resolve_command", fake_resolve) + monkeypatch.setattr(cmd_mod, "is_gateway_known_command", fake_is_known) + + # Non-admin tries to run the plugin command → must be denied by the gate. + result = await runner._handle_message( + _make_event("/myplugin foo bar", _make_source(user_id="999")) + ) + assert "⛔" in result + assert "/myplugin is admin-only here" in result + + +# --------------------------------------------------------------------------- +# Running-agent fast-path gating — admin/user split must hold even when an +# agent is already running. The fast-path block in _handle_message dispatches +# /stop, /restart, /new, /steer, /model, /approve, /deny, /agents, +# /background, /kanban, /goal, /yolo, /verbose, /footer, /help, /commands, +# /profile, /update directly without going through the cold dispatch site. +# We must apply the gate there too — otherwise non-admins could bypass +# gating just because an agent happens to be busy. +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_running_agent_fastpath_blocks_non_admin_command(): + """When an agent is running, /restart from a non-admin must be denied.""" + runner = _make_runner( + platform_extra={ + "allow_admin_from": ["111"], + "user_allowed_commands": [], + } + ) + src = _make_source(user_id="999") + # Mark the session as having an in-flight agent so the fast-path runs. + from gateway.session import build_session_key + sk = build_session_key(src) + runner._running_agents[sk] = MagicMock() + runner._running_agents_ts[sk] = 0 # not stale (epoch + small delta on this machine) + + result = await runner._handle_message(_make_event("/restart", src)) + assert result is not None + assert "⛔" in result + assert "/restart is admin-only here" in result + + +@pytest.mark.asyncio +async def test_running_agent_fastpath_allows_admin_command(): + """Admins must still be able to run privileged commands like /restart + through the running-agent fast-path. We check that we don't get the + denial message; the actual /restart handler is mocked out via the + runner's MagicMock.""" + runner = _make_runner( + platform_extra={ + "allow_admin_from": ["111"], + "user_allowed_commands": [], + } + ) + src = _make_source(user_id="111") # admin + from gateway.session import build_session_key + sk = build_session_key(src) + runner._running_agents[sk] = MagicMock() + runner._running_agents_ts[sk] = 0 + # Mock the restart handler so it doesn't actually try to restart anything. + runner._handle_restart_command = AsyncMock(return_value="restart-handled") + + result = await runner._handle_message(_make_event("/restart", src)) + assert result == "restart-handled" + assert "⛔" not in (result or "") + + +@pytest.mark.asyncio +async def test_running_agent_fastpath_status_always_works(): + """/status is intentionally pre-gate on the fast-path so users can + always see session state, even non-admins.""" + runner = _make_runner( + platform_extra={ + "allow_admin_from": ["111"], + "user_allowed_commands": [], + } + ) + src = _make_source(user_id="999") # non-admin + from gateway.session import build_session_key + sk = build_session_key(src) + runner._running_agents[sk] = MagicMock() + runner._running_agents_ts[sk] = 0 + runner._handle_status_command = AsyncMock(return_value="status-handled") + + result = await runner._handle_message(_make_event("/status", src)) + assert result == "status-handled" + assert "⛔" not in (result or "") + + +# --------------------------------------------------------------------------- +# Alias resolution — /h aliases to /help; the gate must canonicalize before +# checking access. /hist (history alias) is a real one to exercise. +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_gate_uses_canonical_name_not_alias(): + """If /hist resolves to canonical 'history' and history is in + user_allowed_commands, the alias must be allowed too.""" + runner = _make_runner( + platform_extra={ + "allow_admin_from": ["111"], + "user_allowed_commands": ["history"], + } + ) + # Find a real alias in the registry to use. + from hermes_cli.commands import COMMAND_REGISTRY + history_def = next(c for c in COMMAND_REGISTRY if c.name == "history") + # If /history has aliases, use one. Otherwise just use /history. + alias = history_def.aliases[0] if history_def.aliases else "history" + # Mock the history handler so we don't need real session state. + runner._handle_history_command = AsyncMock(return_value="history-handled") + result = await runner._handle_message(_make_event(f"/{alias}", _make_source(user_id="999"))) + assert "⛔" not in (result or "") + + +# --------------------------------------------------------------------------- +# Unknown / unregistered command — gate must NOT intercept (let the existing +# unknown-command path handle it normally). +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_gate_does_not_intercept_unknown_command(): + """Random non-command text like /xyzzy is not in the registry. The gate + must not produce a denial message — the existing unknown-command path + will handle it (or the agent will see it as plain text).""" + runner = _make_runner( + platform_extra={ + "allow_admin_from": ["111"], + "user_allowed_commands": [], + } + ) + # /xyzzy is not in COMMAND_REGISTRY and not a plugin command. + # The gate should pass through (no ⛔) since canonical resolution + # returns the raw command and is_gateway_known_command returns False. + # We can only verify the gate didn't fire — downstream behavior may + # vary (returns None, agent processes it, etc.). What matters: no denial. + runner._handle_unknown_command = AsyncMock(return_value=None) + # Stub out the rest of the cold path to short-circuit + runner.session_store.get_or_create_session.side_effect = RuntimeError("would have proceeded past gate") + try: + await runner._handle_message(_make_event("/xyzzy", _make_source(user_id="999"))) + except RuntimeError as e: + # Reaching session creation means we got past the gate without a denial. + assert "would have proceeded past gate" in str(e) + + +# --------------------------------------------------------------------------- +# Scope independence — admin in DM scope is NOT auto-admin in group when +# group has its own admin list (regression guard for the "admin lists are +# scope-specific" rule). +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_dm_admin_blocked_in_group_with_separate_admin_list(): + runner = _make_runner( + platform_extra={ + "allow_admin_from": ["111"], # DM admin + "group_allow_admin_from": ["222"], # group admin + "group_user_allowed_commands": ["status"], + } + ) + # User 111 is DM admin. In a group, they're a non-admin and can only + # run group_user_allowed_commands. /restart is not in that list → denied. + grp_src = _make_source(user_id="111", chat_type="group", chat_id="g1") + result = await runner._handle_message(_make_event("/restart", grp_src)) + assert "⛔" in result + assert "/restart is admin-only here" in result + + +# --------------------------------------------------------------------------- +# Multi-platform isolation — gating on Discord doesn't leak to Telegram. +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_gating_isolated_per_platform(): + """When Discord is gated and Telegram isn't, the same user_id on + Telegram must be unrestricted.""" + from gateway.run import GatewayRunner + from gateway.config import GatewayConfig, Platform, PlatformConfig + + runner = object.__new__(GatewayRunner) + runner.config = GatewayConfig( + platforms={ + Platform.DISCORD: PlatformConfig( + enabled=True, + token="***", + extra={ + "allow_admin_from": ["111"], + "user_allowed_commands": [], + }, + ), + Platform.TELEGRAM: PlatformConfig( + enabled=True, token="***", extra={} + ), + } + ) + runner.adapters = { + Platform.DISCORD: MagicMock(send=AsyncMock()), + Platform.TELEGRAM: MagicMock(send=AsyncMock()), + } + runner._voice_mode = {} + runner.hooks = SimpleNamespace( + emit=AsyncMock(), + emit_collect=AsyncMock(return_value=[]), + loaded_hooks=False, + ) + runner.session_store = MagicMock() + session_entry = SessionEntry( + session_key="agent:main:telegram:dm:c1", + session_id="sess-1", + created_at=datetime.now(), + updated_at=datetime.now(), + platform=Platform.TELEGRAM, + chat_type="dm", + total_tokens=0, + ) + runner.session_store.get_or_create_session.return_value = session_entry + runner.session_store.load_transcript.return_value = [] + runner.session_store.has_any_sessions.return_value = True + runner.session_store.append_to_transcript = MagicMock() + runner.session_store.rewrite_transcript = MagicMock() + runner.session_store.update_session = MagicMock() + runner._running_agents = {} + runner._running_agents_ts = {} + runner._session_run_generation = {} + runner._pending_messages = {} + runner._pending_approvals = {} + runner._session_sources = {} + runner._session_db = MagicMock() + runner._session_db.get_session_title.return_value = None + runner._session_db.get_session.return_value = None + runner._reasoning_config = None + runner._provider_routing = {} + runner._fallback_model = None + runner._show_reasoning = False + runner._is_user_authorized = lambda _source: True + runner._set_session_env = lambda _context: None + runner._should_send_voice_reply = lambda *_args, **_kwargs: False + runner._send_voice_reply = AsyncMock() + runner._capture_gateway_honcho_if_configured = lambda *args, **kwargs: None + runner._emit_gateway_run_progress = AsyncMock() + + # Same user_id on Telegram → must be unrestricted (Telegram has no admin list). + tg_src = _make_source(platform=Platform.TELEGRAM, user_id="999", chat_id="t1") + result = await runner._handle_message(_make_event("/whoami", tg_src)) + assert "Tier: unrestricted" in result diff --git a/website/docs/user-guide/messaging/discord.md b/website/docs/user-guide/messaging/discord.md index c8a2dbc5f67..375d682f92d 100644 --- a/website/docs/user-guide/messaging/discord.md +++ b/website/docs/user-guide/messaging/discord.md @@ -462,6 +462,48 @@ display: tool_progress_command: true ``` +## Slash Command Access Control + +By default, every allowed user can run every slash command. To split your allowlist into **admins** (full slash command access) and **regular users** (only commands you explicitly enable), add `allow_admin_from` and `user_allowed_commands` to the Discord platform's `extra` block: + +```yaml +gateway: + platforms: + discord: + extra: + # Existing user allowlist (unchanged) + allow_from: + - "123456789012345678" # admin user ID + - "999888777666555444" # regular user ID + + # NEW — admins get all slash commands (built-in + plugin) + allow_admin_from: + - "123456789012345678" + + # NEW — non-admin allowed users can only run these slash commands. + # /help and /whoami are always allowed so users can see their access. + user_allowed_commands: + - status + - model + - history + + # Optional: separate admin / command lists for server channels + group_allow_admin_from: + - "123456789012345678" + group_user_allowed_commands: + - status +``` + +**Behavior:** + +- A user in `allow_admin_from` for a scope (DM or server channel) can run **every** registered slash command — built-in AND plugin-registered — through the live command registry. +- A user not in `allow_admin_from` can only run commands listed in `user_allowed_commands`, plus the always-allowed floor: `/help` and `/whoami`. +- Plain chat (non-slash messages) is unaffected. Non-admin users can still talk to the agent normally; they just can't trigger arbitrary commands. +- **Backward compat:** if `allow_admin_from` is not set for a scope, slash command gating is disabled for that scope. Existing installs keep working with no changes. +- DM admin status does not imply server-channel admin status. Each scope has its own admin list. + +Use `/whoami` to see the active scope, your tier (admin / user / unrestricted), and which slash commands you can run. + ## Interactive Model Picker Send `/model` with no arguments in a Discord channel to open a dropdown-based model picker: diff --git a/website/docs/user-guide/messaging/index.md b/website/docs/user-guide/messaging/index.md index b8ac6fecb3b..acd12872812 100644 --- a/website/docs/user-guide/messaging/index.md +++ b/website/docs/user-guide/messaging/index.md @@ -134,6 +134,7 @@ hermes gateway status --system # Linux only: inspect the system service | `/retry` | Retry the last message | | `/undo` | Remove the last exchange | | `/status` | Show session info | +| `/whoami` | Show your slash command access on this scope (admin / user / unrestricted) | | `/stop` | Stop the running agent | | `/approve` | Approve a pending dangerous command | | `/deny` | Reject a pending dangerous command | @@ -221,6 +222,33 @@ hermes pairing revoke telegram 123456789 # Remove access Pairing codes expire after 1 hour, are rate-limited, and use cryptographic randomness. +### Slash Command Access Control + +Once users are allowed in, you can split them into **admins** (full slash command access) and **regular users** (only the slash commands you explicitly enable). This applies per platform and per scope (DM vs group/channel) and works through the live command registry, so it covers built-in AND plugin-registered slash commands without per-feature wiring. + +```yaml +gateway: + platforms: + discord: + extra: + allow_from: ["111", "222", "333"] + allow_admin_from: ["111"] # admins → all slash commands + user_allowed_commands: [status, model] # what non-admins may run + # Optional: separate group/channel scope + group_allow_admin_from: ["111"] + group_user_allowed_commands: [status] +``` + +Behavior: + +- A user in `allow_admin_from` for a scope can run **every** registered slash command. +- A user in `allow_from` but not in `allow_admin_from` can only run commands in `user_allowed_commands`, plus the always-allowed floor: `/help` and `/whoami`. +- Plain chat is unaffected. Non-admins can still talk to the agent normally; they just can't trigger arbitrary commands. +- **Backward compat:** if `allow_admin_from` is not set for a scope, slash gating is disabled for that scope. Existing installs keep working with no changes. +- DM admin status does not imply group/channel admin status. Each scope has its own admin list. + +Use `/whoami` from any platform to see the active scope, your tier (admin / user / unrestricted), and which slash commands you can run. See the [Telegram](/docs/user-guide/messaging/telegram#slash-command-access-control) and [Discord](/docs/user-guide/messaging/discord#slash-command-access-control) pages for platform-specific examples. + ## Interrupting the Agent Send any message while the agent is working to interrupt it. Key behaviors: diff --git a/website/docs/user-guide/messaging/telegram.md b/website/docs/user-guide/messaging/telegram.md index d41633e995d..ee6e09bfd77 100644 --- a/website/docs/user-guide/messaging/telegram.md +++ b/website/docs/user-guide/messaging/telegram.md @@ -685,6 +685,50 @@ TELEGRAM_GROUP_ALLOWED_USERS="-1001234567890" TELEGRAM_GROUP_ALLOWED_CHATS="-1001234567890" ``` +## Slash Command Access Control + +By default, every allowed user can run every slash command. To split your allowlist into **admins** (full slash command access) and **regular users** (only commands you explicitly enable), add `allow_admin_from` and `user_allowed_commands` to the platform's `extra` block: + +```yaml +gateway: + platforms: + telegram: + extra: + # Existing allowlists (unchanged) + allow_from: + - "123456789" # admin + - "555555555" # regular user + - "777777777" # regular user + + # NEW — admins get all slash commands (built-in + plugin) + allow_admin_from: + - "123456789" + + # NEW — non-admin allowed users can only run these slash commands. + # /help and /whoami are always allowed so users can see their access. + user_allowed_commands: + - status + - model + - history + + # Optional: separate admin/command lists for groups + group_allow_admin_from: + - "123456789" + group_user_allowed_commands: + - status +``` + +**Behavior:** + +- A user listed in `allow_admin_from` for a scope (DM or group) can run **every** registered slash command — built-in commands AND plugin-registered ones — through the live registry. +- A user in `allow_from` but **not** in `allow_admin_from` can only run commands listed in `user_allowed_commands`, plus the always-allowed floor: `/help` and `/whoami`. +- Plain chat (non-slash messages) is unaffected. Non-admin users can still talk to the agent normally, they just can't trigger arbitrary commands. +- **Backward compat:** if `allow_admin_from` is not set for a scope, slash command gating is disabled for that scope. Existing installs keep working with no changes. +- DM admin status does not imply group admin status. Each scope has its own admin list. +- If only `group_allow_admin_from` is set, DM scope stays in unrestricted (backward-compat) mode. + +Use `/whoami` to see the active scope, your tier (admin / user / unrestricted), and which slash commands you can run. + ## Interactive Model Picker When you send `/model` with no arguments in a Telegram chat, Hermes shows an interactive inline keyboard for switching models: