diff --git a/hermes_cli/plugins_cmd.py b/hermes_cli/plugins_cmd.py index 0a5aa8c0fd0..fc66810489e 100644 --- a/hermes_cli/plugins_cmd.py +++ b/hermes_cli/plugins_cmd.py @@ -769,37 +769,135 @@ def _resolve_plugin_key(name: str) -> Optional[str]: return None -def cmd_enable(name: str) -> None: - """Add a plugin to the enabled allow-list (and remove it from disabled).""" +def _resolve_plugin_key_and_source(name: str) -> Optional[tuple]: + """Resolve *name* to ``(canonical_key, source)`` or ``None`` if no match. + + Mirrors :func:`_resolve_plugin_key`'s normalization but also returns the + plugin's source (``"bundled"``, ``"user"``, ``"project"``, ...) so the + enable path can tell whether a built-in-override consent prompt is needed. + """ + entries = _discover_all_plugins() + for entry in entries: + # entry = (name, version, description, source, dir_path, key) + if name == entry[5] or name == entry[0]: + return (entry[5], entry[3]) + leaf_matches = [ + (entry[5], entry[3]) for entry in entries + if name == entry[5].split("/")[-1] + ] + if len(leaf_matches) == 1: + return leaf_matches[0] + return None + + +def _set_plugin_entry_flag(plugin_id: str, key: str, value: bool) -> None: + """Write ``plugins.entries.. = value`` into config.yaml.""" + from hermes_cli.config import load_config, save_config + config = load_config() + plugins_cfg = config.setdefault("plugins", {}) + if not isinstance(plugins_cfg, dict): + plugins_cfg = {} + config["plugins"] = plugins_cfg + entries = plugins_cfg.setdefault("entries", {}) + if not isinstance(entries, dict): + entries = {} + plugins_cfg["entries"] = entries + entry = entries.setdefault(plugin_id, {}) + if not isinstance(entry, dict): + entry = {} + entries[plugin_id] = entry + entry[key] = bool(value) + save_config(config) + + +def cmd_enable(name: str, allow_tool_override: Optional[bool] = None) -> None: + """Add a plugin to the enabled allow-list (and remove it from disabled). + + For non-bundled plugins, prompt the operator about granting the + privileged ``allow_tool_override`` capability (replacing built-in tools + like ``shell_exec`` / ``write_file``). ``allow_tool_override`` is a + tri-state: ``True`` grants without prompting, ``False`` declines without + prompting, ``None`` (default) asks interactively. Bundled plugins are + trusted and never prompted. + """ from rich.console import Console console = Console() # Discover the plugin — check installed (user) AND bundled, including # nested category plugins — and normalize to its canonical registry key. - key = _resolve_plugin_key(name) - if key is None: + resolved = _resolve_plugin_key_and_source(name) + if resolved is None: console.print(f"[red]Plugin '{name}' is not installed or bundled.[/red]") sys.exit(1) + key, source = resolved enabled = _get_enabled_set() disabled = _get_disabled_set() - if key in enabled and key not in disabled: + already_enabled = key in enabled and key not in disabled + + if not already_enabled: + enabled.add(key) + disabled.discard(key) + # Drop any legacy bare-name entry so the two don't drift out of sync. + bare = key.split("/")[-1] + if bare != key: + disabled.discard(bare) + _save_enabled_set(enabled) + _save_disabled_set(disabled) + console.print( + f"[green]✓[/green] Plugin [bold]{key}[/bold] enabled. " + "Takes effect on next session." + ) + else: console.print(f"[dim]Plugin '{key}' is already enabled.[/dim]") + + # Built-in tool override is a privileged grant. Bundled plugins ship with + # Hermes core and are trusted; every other source needs operator opt-in. + if source == "bundled": return - enabled.add(key) - disabled.discard(key) - # Drop any legacy bare-name entry so the two don't drift out of sync. - bare = key.split("/")[-1] - if bare != key: - disabled.discard(bare) - _save_enabled_set(enabled) - _save_disabled_set(disabled) - console.print( - f"[green]✓[/green] Plugin [bold]{key}[/bold] enabled. " - "Takes effect on next session." - ) + _resolve_tool_override_grant(console, key, allow_tool_override) + + +def _resolve_tool_override_grant( + console, key: str, allow_tool_override: Optional[bool] +) -> None: + """Resolve and persist the ``allow_tool_override`` grant for a plugin. + + ``allow_tool_override`` tri-state: True grants, False declines, None + prompts interactively (defaulting to deny on a non-interactive stdin). + """ + if allow_tool_override is None: + # Interactive consent. Default to NO so a blind Enter doesn't grant + # a privileged capability, and a non-interactive stdin denies safely. + prompt = ( + "[yellow]Allow this plugin to replace built-in tools " + "(e.g. shell_exec, write_file)?[/yellow]\n" + " This is a privileged capability: an override can intercept " + "everything the agent routes through that tool.\n" + " Grant it? [y/N] " + ) + try: + answer = console.input(prompt).strip().lower() + except (EOFError, KeyboardInterrupt): + answer = "" + allow_tool_override = answer in {"y", "yes"} + + plugin_id = key + _set_plugin_entry_flag(plugin_id, "allow_tool_override", allow_tool_override) + if allow_tool_override: + console.print( + f"[green]✓[/green] Granted [bold]{key}[/bold] permission to " + "override built-in tools " + f"([dim]plugins.entries.{plugin_id}.allow_tool_override: true[/dim])." + ) + else: + console.print( + f"[dim]{key} may not override built-in tools. Re-run " + f"`hermes plugins enable {key} --allow-tool-override` to grant " + "this later.[/dim]" + ) def cmd_disable(name: str) -> None: @@ -1821,7 +1919,14 @@ def plugins_command(args) -> None: elif action in {"remove", "rm", "uninstall"}: cmd_remove(args.name) elif action == "enable": - cmd_enable(args.name) + # Tri-state: --allow-tool-override=True, --no-allow-tool-override=False, + # neither=None (interactive prompt for non-bundled plugins). + allow_override = None + if getattr(args, "allow_tool_override", False): + allow_override = True + elif getattr(args, "no_allow_tool_override", False): + allow_override = False + cmd_enable(args.name, allow_tool_override=allow_override) elif action == "disable": cmd_disable(args.name) elif action in {"list", "ls"}: diff --git a/hermes_cli/subcommands/plugins.py b/hermes_cli/subcommands/plugins.py index f5211ee5e86..5355fbec342 100644 --- a/hermes_cli/subcommands/plugins.py +++ b/hermes_cli/subcommands/plugins.py @@ -86,6 +86,18 @@ def build_plugins_parser(subparsers, *, cmd_plugins: Callable) -> None: "enable", help="Enable a disabled plugin" ) plugins_enable.add_argument("name", help="Plugin name to enable") + _enable_override_group = plugins_enable.add_mutually_exclusive_group() + _enable_override_group.add_argument( + "--allow-tool-override", + action="store_true", + help="Grant this plugin permission to replace built-in tools " + "(e.g. shell_exec, write_file). Skips the confirmation prompt.", + ) + _enable_override_group.add_argument( + "--no-allow-tool-override", + action="store_true", + help="Enable without granting built-in tool override (skip prompt).", + ) plugins_disable = plugins_subparsers.add_parser( "disable", help="Disable a plugin without removing it" diff --git a/tests/hermes_cli/test_plugins_cmd_enable_disable_nested.py b/tests/hermes_cli/test_plugins_cmd_enable_disable_nested.py index 427647095aa..b001c32d5c6 100644 --- a/tests/hermes_cli/test_plugins_cmd_enable_disable_nested.py +++ b/tests/hermes_cli/test_plugins_cmd_enable_disable_nested.py @@ -114,7 +114,7 @@ class TestEnableDisableNested: mock_user.return_value = nested_plugin_env mock_bundled.return_value = nested_plugin_env / "nonexistent" - cmd_enable("nemo_relay") # bare name + cmd_enable("nemo_relay", allow_tool_override=False) # bare name saved = mock_save_en.call_args[0][0] # The canonical key — NOT the bare name — must be persisted, because @@ -136,7 +136,7 @@ class TestEnableDisableNested: mock_user.return_value = nested_plugin_env mock_bundled.return_value = nested_plugin_env / "nonexistent" - cmd_enable("observability/nemo_relay") + cmd_enable("observability/nemo_relay", allow_tool_override=False) saved = mock_save_en.call_args[0][0] assert "observability/nemo_relay" in saved @@ -188,6 +188,153 @@ class TestEnableDisableNested: mock_user.return_value = nested_plugin_env mock_bundled.return_value = nested_plugin_env / "nonexistent" - cmd_enable("disk-cleanup") + cmd_enable("disk-cleanup", allow_tool_override=False) saved = mock_save_en.call_args[0][0] assert "disk-cleanup" in saved + + +# --------------------------------------------------------------------------- +# cmd_enable — built-in tool override consent (issue #29249) +# --------------------------------------------------------------------------- + + +class TestEnableToolOverrideConsent: + """Enabling a non-bundled plugin must surface a consent decision about the + privileged ``allow_tool_override`` capability, and persist the operator's + choice under ``plugins.entries..allow_tool_override``.""" + + @patch("hermes_cli.plugins.get_bundled_plugins_dir") + @patch("hermes_cli.plugins_cmd._plugins_dir") + @patch("hermes_cli.plugins_cmd._set_plugin_entry_flag") + @patch("hermes_cli.plugins_cmd._save_disabled_set") + @patch("hermes_cli.plugins_cmd._save_enabled_set") + @patch("hermes_cli.plugins_cmd._get_disabled_set", return_value=set()) + @patch("hermes_cli.plugins_cmd._get_enabled_set", return_value=set()) + def test_flag_true_grants_override_without_prompt( + self, mock_en, mock_dis, mock_save_en, mock_save_dis, mock_set_flag, + mock_user, mock_bundled, nested_plugin_env, + ): + from hermes_cli.plugins_cmd import cmd_enable + mock_user.return_value = nested_plugin_env + mock_bundled.return_value = nested_plugin_env / "nonexistent" + + cmd_enable("disk-cleanup", allow_tool_override=True) + + mock_set_flag.assert_called_once_with( + "disk-cleanup", "allow_tool_override", True + ) + + @patch("hermes_cli.plugins.get_bundled_plugins_dir") + @patch("hermes_cli.plugins_cmd._plugins_dir") + @patch("hermes_cli.plugins_cmd._set_plugin_entry_flag") + @patch("hermes_cli.plugins_cmd._save_disabled_set") + @patch("hermes_cli.plugins_cmd._save_enabled_set") + @patch("hermes_cli.plugins_cmd._get_disabled_set", return_value=set()) + @patch("hermes_cli.plugins_cmd._get_enabled_set", return_value=set()) + def test_flag_false_declines_override_without_prompt( + self, mock_en, mock_dis, mock_save_en, mock_save_dis, mock_set_flag, + mock_user, mock_bundled, nested_plugin_env, + ): + from hermes_cli.plugins_cmd import cmd_enable + mock_user.return_value = nested_plugin_env + mock_bundled.return_value = nested_plugin_env / "nonexistent" + + cmd_enable("disk-cleanup", allow_tool_override=False) + + mock_set_flag.assert_called_once_with( + "disk-cleanup", "allow_tool_override", False + ) + + @patch("hermes_cli.plugins.get_bundled_plugins_dir") + @patch("hermes_cli.plugins_cmd._plugins_dir") + @patch("hermes_cli.plugins_cmd._set_plugin_entry_flag") + @patch("hermes_cli.plugins_cmd._save_disabled_set") + @patch("hermes_cli.plugins_cmd._save_enabled_set") + @patch("hermes_cli.plugins_cmd._get_disabled_set", return_value=set()) + @patch("hermes_cli.plugins_cmd._get_enabled_set", return_value=set()) + def test_interactive_yes_grants_override( + self, mock_en, mock_dis, mock_save_en, mock_save_dis, mock_set_flag, + mock_user, mock_bundled, nested_plugin_env, + ): + from hermes_cli.plugins_cmd import cmd_enable + mock_user.return_value = nested_plugin_env + mock_bundled.return_value = nested_plugin_env / "nonexistent" + + with patch("rich.console.Console.input", return_value="y"): + cmd_enable("disk-cleanup") # no flag -> prompt + + mock_set_flag.assert_called_once_with( + "disk-cleanup", "allow_tool_override", True + ) + + @patch("hermes_cli.plugins.get_bundled_plugins_dir") + @patch("hermes_cli.plugins_cmd._plugins_dir") + @patch("hermes_cli.plugins_cmd._set_plugin_entry_flag") + @patch("hermes_cli.plugins_cmd._save_disabled_set") + @patch("hermes_cli.plugins_cmd._save_enabled_set") + @patch("hermes_cli.plugins_cmd._get_disabled_set", return_value=set()) + @patch("hermes_cli.plugins_cmd._get_enabled_set", return_value=set()) + def test_interactive_blank_enter_defaults_to_deny( + self, mock_en, mock_dis, mock_save_en, mock_save_dis, mock_set_flag, + mock_user, mock_bundled, nested_plugin_env, + ): + """A blind Enter must NOT grant a privileged capability.""" + from hermes_cli.plugins_cmd import cmd_enable + mock_user.return_value = nested_plugin_env + mock_bundled.return_value = nested_plugin_env / "nonexistent" + + with patch("rich.console.Console.input", return_value=""): + cmd_enable("disk-cleanup") + + mock_set_flag.assert_called_once_with( + "disk-cleanup", "allow_tool_override", False + ) + + @patch("hermes_cli.plugins.get_bundled_plugins_dir") + @patch("hermes_cli.plugins_cmd._plugins_dir") + @patch("hermes_cli.plugins_cmd._set_plugin_entry_flag") + @patch("hermes_cli.plugins_cmd._save_disabled_set") + @patch("hermes_cli.plugins_cmd._save_enabled_set") + @patch("hermes_cli.plugins_cmd._get_disabled_set", return_value=set()) + @patch("hermes_cli.plugins_cmd._get_enabled_set", return_value=set()) + def test_interactive_eof_defaults_to_deny( + self, mock_en, mock_dis, mock_save_en, mock_save_dis, mock_set_flag, + mock_user, mock_bundled, nested_plugin_env, + ): + """Non-interactive stdin (EOFError) must fail closed to deny.""" + from hermes_cli.plugins_cmd import cmd_enable + mock_user.return_value = nested_plugin_env + mock_bundled.return_value = nested_plugin_env / "nonexistent" + + with patch("rich.console.Console.input", side_effect=EOFError): + cmd_enable("disk-cleanup") + + mock_set_flag.assert_called_once_with( + "disk-cleanup", "allow_tool_override", False + ) + + @patch("hermes_cli.plugins.get_bundled_plugins_dir") + @patch("hermes_cli.plugins_cmd._plugins_dir") + @patch("hermes_cli.plugins_cmd._set_plugin_entry_flag") + @patch("hermes_cli.plugins_cmd._save_disabled_set") + @patch("hermes_cli.plugins_cmd._save_enabled_set") + @patch("hermes_cli.plugins_cmd._get_disabled_set", return_value=set()) + @patch("hermes_cli.plugins_cmd._get_enabled_set", return_value=set()) + def test_bundled_plugin_never_prompts_or_writes_entry( + self, mock_en, mock_dis, mock_save_en, mock_save_dis, mock_set_flag, + mock_user, mock_bundled, tmp_path, + ): + """Bundled plugins are trusted — no consent prompt, no entry write.""" + from hermes_cli.plugins_cmd import cmd_enable + # Bundled dir holds the plugin; user dir is empty. + _make_plugin_dir(tmp_path / "bundled", "trusted_bundled", { + "name": "trusted_bundled", "version": "1.0.0", + }) + mock_user.return_value = tmp_path / "empty" + mock_bundled.return_value = tmp_path / "bundled" + + # Console.input would raise if called — proving no prompt fired. + with patch("rich.console.Console.input", side_effect=AssertionError("prompted")): + cmd_enable("trusted_bundled") + + mock_set_flag.assert_not_called()