hermes-agent/gateway/slash_access.py
Teknium a282434301
feat(gateway): per-platform admin/user split for slash commands (salvage of #4443) (#23373)
* feat(gateway): per-platform admin/user split for slash commands

Adds an opt-in two-list access control on top of the existing per-platform
`allow_from` allowlists, scoped to slash commands only:

  - allow_admin_from         — full slash command access
  - user_allowed_commands    — what non-admins may run
  - group_allow_admin_from   — same, group/channel scope
  - group_user_allowed_commands

When `allow_admin_from` is unset for a scope, gating is disabled and every
allowed user keeps full access (backward compat). Plain chat is unaffected.
`/help` and `/whoami` are always reachable so users can see what they
can run.

Gate runs at the slash command dispatch site in gateway/run.py and uses
`is_gateway_known_command()`, so it covers built-in AND plugin-registered
commands through the live registry without per-feature wiring.

Adds `/whoami` showing platform, scope, tier, and runnable commands.

Salvage of PR #4443's permission tier work, scoped down. The full tier
system, tool filtering, audit log, usage tracking, rate limiting,
`/promote` flow, and persistent SQLite stores are not included here —
those can be re-expanded later if needed.

Co-authored-by: ReqX <mike@grossmann.at>

* fix(gateway): close running-agent fast-path bypass + add coverage and central docs

The slash command access gate was only applied at the cold dispatch site
(line ~5921). When an agent was already running, the running-agent
fast-path block (line ~5574) dispatched /restart, /stop, /new, /steer,
/model, /approve, /deny, /agents, /background, /kanban, /goal, /yolo,
/verbose, /footer, /help, /commands, /profile, /update directly
without going through the gate — letting non-admins bypass gating just
because an agent happens to be busy.

Refactored the gate into _check_slash_access() and called from BOTH
paths. /status remains intentionally pre-gate so users can always see
session state.

Also added 18 more dispatch tests covering:
  - Running-agent fast-path: blocks non-admin, allows admin, /status
    always works
  - Alias canonicalization (gate uses canonical name, not user alias)
  - Unknown / unregistered commands pass through (don't false-positive)
  - DM admin scope-locked when group has its own admin list
  - Multi-platform isolation (Discord gated, Telegram unrestricted)

Docs: added Slash Command Access Control section to the central
messaging index page + /whoami row in the chat commands table.

Co-authored-by: ReqX <mike@grossmann.at>

---------

Co-authored-by: ReqX <mike@grossmann.at>
2026-05-10 12:33:54 -07:00

229 lines
8.2 KiB
Python

"""Per-platform slash command access control.
This module sits beside the existing per-platform allowlist (``allow_from``)
and adds a second axis: of the users who are *allowed to talk to the
gateway*, which ones can run *which slash commands*.
Two lists per platform scope (DM vs group, mirroring ``allow_from`` vs
``group_allow_from``):
- ``allow_admin_from`` — user IDs that get every registered slash
command (built-in + plugin-registered).
- ``user_allowed_commands`` — slash command names non-admin users may
run. Empty / unset → non-admins get no
slash commands.
Backward compatibility:
If ``allow_admin_from`` is not set for a scope, slash command gating
is disabled entirely for that scope. Every allowed user can run every
slash command, exactly like before. This means existing installs are
unaffected until an operator opts in by listing at least one admin.
The gate is applied at the slash command dispatch site in
``gateway/run.py`` so it covers BOTH built-in and plugin-registered
commands via the live registry. Gating slash commands does not affect
plain chat — non-admin users can still talk to the agent normally,
they just can't trigger commands outside ``user_allowed_commands``.
Authored as a slimmed-down salvage of PR #4443's permission tiers
(co-authored by @ReqX). The full tier system, audit log, usage
tracking, rate limiting, and tool filtering from that PR are not
included here — only the slash-command access split.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, FrozenSet, Iterable, Optional, Tuple
# Slash commands that MUST stay reachable for any allowed user, even when
# slash gating is enabled and the user has no commands listed. Without this
# carve-out, a non-admin user has no way to discover what they can or
# can't do (``/help``, ``/whoami``) and no way to see what state the agent
# is in (``/status``). These mirror the smallest set of read-only commands
# we'd hand to a guest. Operators can still narrow this further by writing
# their own ``user_allowed_commands`` (this set is only the implicit
# fallback floor — anything in ``user_allowed_commands`` overrides it
# additively, never restrictively).
_ALWAYS_ALLOWED_FOR_USERS: FrozenSet[str] = frozenset({
"help",
"whoami",
})
@dataclass(frozen=True)
class SlashAccessPolicy:
"""Resolved access policy for a single (platform, scope) pair.
``scope`` is ``"dm"`` for direct messages and ``"group"`` for groups,
channels, threads, and any other multi-user context. The mapping from
SessionSource.chat_type → scope happens in ``policy_for_source``.
"""
enabled: bool # gating active for this scope?
admin_user_ids: FrozenSet[str]
user_allowed_commands: FrozenSet[str]
def is_admin(self, user_id: Optional[str]) -> bool:
if not self.enabled:
# Gating disabled → treat every allowed user as admin so
# downstream code can keep using ``is_admin`` / ``can_run``
# uniformly.
return True
if not user_id:
return False
return str(user_id) in self.admin_user_ids
def can_run(self, user_id: Optional[str], canonical_cmd: str) -> bool:
if not self.enabled:
return True
if self.is_admin(user_id):
return True
if not canonical_cmd:
return False
if canonical_cmd in _ALWAYS_ALLOWED_FOR_USERS:
return True
return canonical_cmd in self.user_allowed_commands
_DM_CHAT_TYPES = frozenset({"dm", "direct", "private", ""})
def _coerce_id_list(raw: Any) -> FrozenSet[str]:
"""Normalize a YAML-loaded admin/user list into a frozenset of strings.
Accepts ``None``, list, tuple, or comma-separated string. Stringifies
each entry and strips whitespace; empty entries are dropped.
"""
if raw is None:
return frozenset()
if isinstance(raw, (list, tuple, set, frozenset)):
items: Iterable[Any] = raw
elif isinstance(raw, str):
items = (s for s in raw.split(",") if s.strip())
else:
# single scalar (int user id, etc.)
items = (raw,)
out: list[str] = []
for it in items:
s = str(it).strip()
if s:
out.append(s)
return frozenset(out)
def _coerce_command_list(raw: Any) -> FrozenSet[str]:
"""Normalize a slash command allowlist.
Strips leading slashes so YAML can read either ``["help", "status"]``
or ``["/help", "/status"]``. Lowercase canonicalization matches how
``resolve_command()`` stores names.
"""
if raw is None:
return frozenset()
if isinstance(raw, (list, tuple, set, frozenset)):
items: Iterable[Any] = raw
elif isinstance(raw, str):
items = (s for s in raw.split(",") if s.strip())
else:
items = (raw,)
out: list[str] = []
for it in items:
s = str(it).strip().lstrip("/").lower()
if s:
out.append(s)
return frozenset(out)
def _scope_for_chat_type(chat_type: Optional[str]) -> str:
if chat_type and chat_type.lower() in _DM_CHAT_TYPES:
return "dm"
return "group"
def _platform_extra(platform_config: Any) -> dict:
"""Return the ``extra`` dict from a PlatformConfig-like object.
Defensively handles None and non-PlatformConfig shapes so calling
code can stay simple.
"""
if platform_config is None:
return {}
extra = getattr(platform_config, "extra", None)
if isinstance(extra, dict):
return extra
if isinstance(platform_config, dict):
# Some test harnesses pass dicts directly.
return platform_config
return {}
def _keys_for_scope(scope: str) -> Tuple[str, str]:
"""Return (admin_key, user_cmd_key) names for a scope."""
if scope == "group":
return ("group_allow_admin_from", "group_user_allowed_commands")
return ("allow_admin_from", "user_allowed_commands")
def policy_from_extra(extra: dict, scope: str) -> SlashAccessPolicy:
"""Build a policy from a platform's ``extra`` dict for one scope.
DM scope falls back to group scope keys ONLY for ``user_allowed_commands``
when the DM scope didn't specify its own. This keeps the common case
(operator wants the same command set DM and group) ergonomic without
forcing duplication. Admin lists are NOT cross-scope: an admin in
DMs is not implicitly an admin in a group.
"""
admin_key, cmd_key = _keys_for_scope(scope)
admin_ids = _coerce_id_list(extra.get(admin_key))
cmds = _coerce_command_list(extra.get(cmd_key))
if scope == "dm" and not cmds:
# DM didn't specify — let group's user_allowed_commands fall through
# so operators only need to list it once if it's the same.
cmds = _coerce_command_list(extra.get("group_user_allowed_commands"))
enabled = bool(admin_ids)
return SlashAccessPolicy(
enabled=enabled,
admin_user_ids=admin_ids,
user_allowed_commands=cmds,
)
def policy_for_source(gateway_config: Any, source: Any) -> SlashAccessPolicy:
"""Resolve the access policy for a SessionSource.
Returns a "disabled" policy (gating off, allow everything) when:
- gateway_config is None
- the platform has no PlatformConfig
- the platform's PlatformConfig has no admin list set for the scope
Callers should treat the returned policy as authoritative for slash
command gating only. It does not gate plain chat messages.
"""
if gateway_config is None or source is None:
return SlashAccessPolicy(
enabled=False,
admin_user_ids=frozenset(),
user_allowed_commands=frozenset(),
)
platforms = getattr(gateway_config, "platforms", None)
platform_config = None
if platforms is not None:
try:
platform_config = platforms.get(source.platform)
except Exception:
platform_config = None
extra = _platform_extra(platform_config)
scope = _scope_for_chat_type(getattr(source, "chat_type", None))
return policy_from_extra(extra, scope)
__all__ = [
"SlashAccessPolicy",
"policy_from_extra",
"policy_for_source",
]