mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-18 04:41:56 +00:00
* feat(gateway): per-platform admin/user split for slash commands Adds an opt-in two-list access control on top of the existing per-platform `allow_from` allowlists, scoped to slash commands only: - allow_admin_from — full slash command access - user_allowed_commands — what non-admins may run - group_allow_admin_from — same, group/channel scope - group_user_allowed_commands When `allow_admin_from` is unset for a scope, gating is disabled and every allowed user keeps full access (backward compat). Plain chat is unaffected. `/help` and `/whoami` are always reachable so users can see what they can run. Gate runs at the slash command dispatch site in gateway/run.py and uses `is_gateway_known_command()`, so it covers built-in AND plugin-registered commands through the live registry without per-feature wiring. Adds `/whoami` showing platform, scope, tier, and runnable commands. Salvage of PR #4443's permission tier work, scoped down. The full tier system, tool filtering, audit log, usage tracking, rate limiting, `/promote` flow, and persistent SQLite stores are not included here — those can be re-expanded later if needed. Co-authored-by: ReqX <mike@grossmann.at> * fix(gateway): close running-agent fast-path bypass + add coverage and central docs The slash command access gate was only applied at the cold dispatch site (line ~5921). When an agent was already running, the running-agent fast-path block (line ~5574) dispatched /restart, /stop, /new, /steer, /model, /approve, /deny, /agents, /background, /kanban, /goal, /yolo, /verbose, /footer, /help, /commands, /profile, /update directly without going through the gate — letting non-admins bypass gating just because an agent happens to be busy. Refactored the gate into _check_slash_access() and called from BOTH paths. /status remains intentionally pre-gate so users can always see session state. Also added 18 more dispatch tests covering: - Running-agent fast-path: blocks non-admin, allows admin, /status always works - Alias canonicalization (gate uses canonical name, not user alias) - Unknown / unregistered commands pass through (don't false-positive) - DM admin scope-locked when group has its own admin list - Multi-platform isolation (Discord gated, Telegram unrestricted) Docs: added Slash Command Access Control section to the central messaging index page + /whoami row in the chat commands table. Co-authored-by: ReqX <mike@grossmann.at> --------- Co-authored-by: ReqX <mike@grossmann.at>
229 lines
8.2 KiB
Python
229 lines
8.2 KiB
Python
"""Per-platform slash command access control.
|
|
|
|
This module sits beside the existing per-platform allowlist (``allow_from``)
|
|
and adds a second axis: of the users who are *allowed to talk to the
|
|
gateway*, which ones can run *which slash commands*.
|
|
|
|
Two lists per platform scope (DM vs group, mirroring ``allow_from`` vs
|
|
``group_allow_from``):
|
|
|
|
- ``allow_admin_from`` — user IDs that get every registered slash
|
|
command (built-in + plugin-registered).
|
|
- ``user_allowed_commands`` — slash command names non-admin users may
|
|
run. Empty / unset → non-admins get no
|
|
slash commands.
|
|
|
|
Backward compatibility:
|
|
|
|
If ``allow_admin_from`` is not set for a scope, slash command gating
|
|
is disabled entirely for that scope. Every allowed user can run every
|
|
slash command, exactly like before. This means existing installs are
|
|
unaffected until an operator opts in by listing at least one admin.
|
|
|
|
The gate is applied at the slash command dispatch site in
|
|
``gateway/run.py`` so it covers BOTH built-in and plugin-registered
|
|
commands via the live registry. Gating slash commands does not affect
|
|
plain chat — non-admin users can still talk to the agent normally,
|
|
they just can't trigger commands outside ``user_allowed_commands``.
|
|
|
|
Authored as a slimmed-down salvage of PR #4443's permission tiers
|
|
(co-authored by @ReqX). The full tier system, audit log, usage
|
|
tracking, rate limiting, and tool filtering from that PR are not
|
|
included here — only the slash-command access split.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Any, FrozenSet, Iterable, Optional, Tuple
|
|
|
|
|
|
# Slash commands that MUST stay reachable for any allowed user, even when
|
|
# slash gating is enabled and the user has no commands listed. Without this
|
|
# carve-out, a non-admin user has no way to discover what they can or
|
|
# can't do (``/help``, ``/whoami``) and no way to see what state the agent
|
|
# is in (``/status``). These mirror the smallest set of read-only commands
|
|
# we'd hand to a guest. Operators can still narrow this further by writing
|
|
# their own ``user_allowed_commands`` (this set is only the implicit
|
|
# fallback floor — anything in ``user_allowed_commands`` overrides it
|
|
# additively, never restrictively).
|
|
_ALWAYS_ALLOWED_FOR_USERS: FrozenSet[str] = frozenset({
|
|
"help",
|
|
"whoami",
|
|
})
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SlashAccessPolicy:
|
|
"""Resolved access policy for a single (platform, scope) pair.
|
|
|
|
``scope`` is ``"dm"`` for direct messages and ``"group"`` for groups,
|
|
channels, threads, and any other multi-user context. The mapping from
|
|
SessionSource.chat_type → scope happens in ``policy_for_source``.
|
|
"""
|
|
|
|
enabled: bool # gating active for this scope?
|
|
admin_user_ids: FrozenSet[str]
|
|
user_allowed_commands: FrozenSet[str]
|
|
|
|
def is_admin(self, user_id: Optional[str]) -> bool:
|
|
if not self.enabled:
|
|
# Gating disabled → treat every allowed user as admin so
|
|
# downstream code can keep using ``is_admin`` / ``can_run``
|
|
# uniformly.
|
|
return True
|
|
if not user_id:
|
|
return False
|
|
return str(user_id) in self.admin_user_ids
|
|
|
|
def can_run(self, user_id: Optional[str], canonical_cmd: str) -> bool:
|
|
if not self.enabled:
|
|
return True
|
|
if self.is_admin(user_id):
|
|
return True
|
|
if not canonical_cmd:
|
|
return False
|
|
if canonical_cmd in _ALWAYS_ALLOWED_FOR_USERS:
|
|
return True
|
|
return canonical_cmd in self.user_allowed_commands
|
|
|
|
|
|
_DM_CHAT_TYPES = frozenset({"dm", "direct", "private", ""})
|
|
|
|
|
|
def _coerce_id_list(raw: Any) -> FrozenSet[str]:
|
|
"""Normalize a YAML-loaded admin/user list into a frozenset of strings.
|
|
|
|
Accepts ``None``, list, tuple, or comma-separated string. Stringifies
|
|
each entry and strips whitespace; empty entries are dropped.
|
|
"""
|
|
if raw is None:
|
|
return frozenset()
|
|
if isinstance(raw, (list, tuple, set, frozenset)):
|
|
items: Iterable[Any] = raw
|
|
elif isinstance(raw, str):
|
|
items = (s for s in raw.split(",") if s.strip())
|
|
else:
|
|
# single scalar (int user id, etc.)
|
|
items = (raw,)
|
|
out: list[str] = []
|
|
for it in items:
|
|
s = str(it).strip()
|
|
if s:
|
|
out.append(s)
|
|
return frozenset(out)
|
|
|
|
|
|
def _coerce_command_list(raw: Any) -> FrozenSet[str]:
|
|
"""Normalize a slash command allowlist.
|
|
|
|
Strips leading slashes so YAML can read either ``["help", "status"]``
|
|
or ``["/help", "/status"]``. Lowercase canonicalization matches how
|
|
``resolve_command()`` stores names.
|
|
"""
|
|
if raw is None:
|
|
return frozenset()
|
|
if isinstance(raw, (list, tuple, set, frozenset)):
|
|
items: Iterable[Any] = raw
|
|
elif isinstance(raw, str):
|
|
items = (s for s in raw.split(",") if s.strip())
|
|
else:
|
|
items = (raw,)
|
|
out: list[str] = []
|
|
for it in items:
|
|
s = str(it).strip().lstrip("/").lower()
|
|
if s:
|
|
out.append(s)
|
|
return frozenset(out)
|
|
|
|
|
|
def _scope_for_chat_type(chat_type: Optional[str]) -> str:
|
|
if chat_type and chat_type.lower() in _DM_CHAT_TYPES:
|
|
return "dm"
|
|
return "group"
|
|
|
|
|
|
def _platform_extra(platform_config: Any) -> dict:
|
|
"""Return the ``extra`` dict from a PlatformConfig-like object.
|
|
|
|
Defensively handles None and non-PlatformConfig shapes so calling
|
|
code can stay simple.
|
|
"""
|
|
if platform_config is None:
|
|
return {}
|
|
extra = getattr(platform_config, "extra", None)
|
|
if isinstance(extra, dict):
|
|
return extra
|
|
if isinstance(platform_config, dict):
|
|
# Some test harnesses pass dicts directly.
|
|
return platform_config
|
|
return {}
|
|
|
|
|
|
def _keys_for_scope(scope: str) -> Tuple[str, str]:
|
|
"""Return (admin_key, user_cmd_key) names for a scope."""
|
|
if scope == "group":
|
|
return ("group_allow_admin_from", "group_user_allowed_commands")
|
|
return ("allow_admin_from", "user_allowed_commands")
|
|
|
|
|
|
def policy_from_extra(extra: dict, scope: str) -> SlashAccessPolicy:
|
|
"""Build a policy from a platform's ``extra`` dict for one scope.
|
|
|
|
DM scope falls back to group scope keys ONLY for ``user_allowed_commands``
|
|
when the DM scope didn't specify its own. This keeps the common case
|
|
(operator wants the same command set DM and group) ergonomic without
|
|
forcing duplication. Admin lists are NOT cross-scope: an admin in
|
|
DMs is not implicitly an admin in a group.
|
|
"""
|
|
admin_key, cmd_key = _keys_for_scope(scope)
|
|
admin_ids = _coerce_id_list(extra.get(admin_key))
|
|
cmds = _coerce_command_list(extra.get(cmd_key))
|
|
|
|
if scope == "dm" and not cmds:
|
|
# DM didn't specify — let group's user_allowed_commands fall through
|
|
# so operators only need to list it once if it's the same.
|
|
cmds = _coerce_command_list(extra.get("group_user_allowed_commands"))
|
|
|
|
enabled = bool(admin_ids)
|
|
return SlashAccessPolicy(
|
|
enabled=enabled,
|
|
admin_user_ids=admin_ids,
|
|
user_allowed_commands=cmds,
|
|
)
|
|
|
|
|
|
def policy_for_source(gateway_config: Any, source: Any) -> SlashAccessPolicy:
|
|
"""Resolve the access policy for a SessionSource.
|
|
|
|
Returns a "disabled" policy (gating off, allow everything) when:
|
|
- gateway_config is None
|
|
- the platform has no PlatformConfig
|
|
- the platform's PlatformConfig has no admin list set for the scope
|
|
|
|
Callers should treat the returned policy as authoritative for slash
|
|
command gating only. It does not gate plain chat messages.
|
|
"""
|
|
if gateway_config is None or source is None:
|
|
return SlashAccessPolicy(
|
|
enabled=False,
|
|
admin_user_ids=frozenset(),
|
|
user_allowed_commands=frozenset(),
|
|
)
|
|
platforms = getattr(gateway_config, "platforms", None)
|
|
platform_config = None
|
|
if platforms is not None:
|
|
try:
|
|
platform_config = platforms.get(source.platform)
|
|
except Exception:
|
|
platform_config = None
|
|
extra = _platform_extra(platform_config)
|
|
scope = _scope_for_chat_type(getattr(source, "chat_type", None))
|
|
return policy_from_extra(extra, scope)
|
|
|
|
|
|
__all__ = [
|
|
"SlashAccessPolicy",
|
|
"policy_from_extra",
|
|
"policy_for_source",
|
|
]
|