diff --git a/cli.py b/cli.py index 0d13d230cf..b3627fc451 100755 --- a/cli.py +++ b/cli.py @@ -2827,9 +2827,32 @@ class HermesCLI: elif cmd_lower.startswith("/skin"): self._handle_skin_command(cmd_original) else: - # Check for skill slash commands (/gif-search, /axolotl, etc.) + # Check for user-defined quick commands (bypass agent loop, no LLM call) base_cmd = cmd_lower.split()[0] - if base_cmd in _skill_commands: + quick_commands = self.config.get("quick_commands", {}) + if base_cmd.lstrip("/") in quick_commands: + qcmd = quick_commands[base_cmd.lstrip("/")] + if qcmd.get("type") == "exec": + import subprocess + exec_cmd = qcmd.get("command", "") + if exec_cmd: + try: + result = subprocess.run( + exec_cmd, shell=True, capture_output=True, + text=True, timeout=30 + ) + output = result.stdout.strip() or result.stderr.strip() + self.console.print(output if output else "[dim]Command returned no output[/]") + except subprocess.TimeoutExpired: + self.console.print("[bold red]Quick command timed out (30s)[/]") + except Exception as e: + self.console.print(f"[bold red]Quick command error: {e}[/]") + else: + self.console.print(f"[bold red]Quick command '{base_cmd}' has no command defined[/]") + else: + self.console.print(f"[bold red]Quick command '{base_cmd}' has unsupported type (only 'exec' is supported)[/]") + # Check for skill slash commands (/gif-search, /axolotl, etc.) + elif base_cmd in _skill_commands: user_instruction = cmd_original[len(base_cmd):].strip() msg = build_skill_invocation_message(base_cmd, user_instruction) if msg: diff --git a/gateway/run.py b/gateway/run.py index 72ec62b409..be89833acd 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -869,6 +869,33 @@ class GatewayRunner: if command == "rollback": return await self._handle_rollback_command(event) + # User-defined quick commands (bypass agent loop, no LLM call) + if command: + quick_commands = self.config.get("quick_commands", {}) + if command in quick_commands: + qcmd = quick_commands[command] + if qcmd.get("type") == "exec": + import asyncio + exec_cmd = qcmd.get("command", "") + if exec_cmd: + try: + proc = await asyncio.create_subprocess_shell( + exec_cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30) + output = (stdout or stderr).decode().strip() + return output if output else "Command returned no output." + except asyncio.TimeoutError: + return "Quick command timed out (30s)." + except Exception as e: + return f"Quick command error: {e}" + else: + return f"Quick command '/{command}' has no command defined." + else: + return f"Quick command '/{command}' has unsupported type (only 'exec' is supported)." + # Skill slash commands: /skill-name loads the skill and sends to agent if command: try: diff --git a/hermes_cli/config.py b/hermes_cli/config.py index ccf3debc16..06175cd3cd 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -180,6 +180,8 @@ DEFAULT_CONFIG = { # Permanently allowed dangerous command patterns (added via "always" approval) "command_allowlist": [], + # User-defined quick commands that bypass the agent loop (type: exec only) + "quick_commands": {}, # Config schema version - bump this when adding new required fields "_config_version": 6, diff --git a/tests/test_quick_commands.py b/tests/test_quick_commands.py new file mode 100644 index 0000000000..c34a3d0529 --- /dev/null +++ b/tests/test_quick_commands.py @@ -0,0 +1,137 @@ +"""Tests for user-defined quick commands that bypass the agent loop.""" +import subprocess +from unittest.mock import MagicMock, patch, AsyncMock +import pytest + + +# ── CLI tests ────────────────────────────────────────────────────────────── + +class TestCLIQuickCommands: + """Test quick command dispatch in HermesCLI.process_command.""" + + def _make_cli(self, quick_commands): + from cli import HermesCLI + cli = HermesCLI.__new__(HermesCLI) + cli.config = {"quick_commands": quick_commands} + cli.console = MagicMock() + cli.agent = None + cli.conversation_history = [] + return cli + + def test_exec_command_runs_and_prints_output(self): + cli = self._make_cli({"dn": {"type": "exec", "command": "echo daily-note"}}) + result = cli.process_command("/dn") + assert result is True + cli.console.print.assert_called_once_with("daily-note") + + def test_exec_command_stderr_shown_on_no_stdout(self): + cli = self._make_cli({"err": {"type": "exec", "command": "echo error >&2"}}) + result = cli.process_command("/err") + assert result is True + # stderr fallback — should print something + cli.console.print.assert_called_once() + + def test_exec_command_no_output_shows_fallback(self): + cli = self._make_cli({"empty": {"type": "exec", "command": "true"}}) + cli.process_command("/empty") + cli.console.print.assert_called_once() + args = cli.console.print.call_args[0][0] + assert "no output" in args.lower() + + def test_unsupported_type_shows_error(self): + cli = self._make_cli({"bad": {"type": "prompt", "command": "echo hi"}}) + cli.process_command("/bad") + cli.console.print.assert_called_once() + args = cli.console.print.call_args[0][0] + assert "unsupported type" in args.lower() + + def test_missing_command_field_shows_error(self): + cli = self._make_cli({"oops": {"type": "exec"}}) + cli.process_command("/oops") + cli.console.print.assert_called_once() + args = cli.console.print.call_args[0][0] + assert "no command defined" in args.lower() + + def test_quick_command_takes_priority_over_skill_commands(self): + """Quick commands must be checked before skill slash commands.""" + cli = self._make_cli({"mygif": {"type": "exec", "command": "echo overridden"}}) + with patch("cli._skill_commands", {"/mygif": {"name": "gif-search"}}): + cli.process_command("/mygif") + cli.console.print.assert_called_once_with("overridden") + + def test_unknown_command_still_shows_error(self): + cli = self._make_cli({}) + cli.process_command("/nonexistent") + cli.console.print.assert_called() + args = cli.console.print.call_args_list[0][0][0] + assert "unknown command" in args.lower() + + def test_timeout_shows_error(self): + cli = self._make_cli({"slow": {"type": "exec", "command": "sleep 100"}}) + with patch("subprocess.run", side_effect=subprocess.TimeoutExpired("sleep", 30)): + cli.process_command("/slow") + cli.console.print.assert_called_once() + args = cli.console.print.call_args[0][0] + assert "timed out" in args.lower() + + +# ── Gateway tests ────────────────────────────────────────────────────────── + +class TestGatewayQuickCommands: + """Test quick command dispatch in GatewayRunner._handle_message.""" + + def _make_event(self, command, args=""): + event = MagicMock() + event.get_command.return_value = command + event.get_command_args.return_value = args + event.text = f"/{command} {args}".strip() + event.source = MagicMock() + event.source.user_id = "test_user" + event.source.user_name = "Test User" + event.source.platform.value = "telegram" + event.source.chat_type = "dm" + event.source.chat_id = "123" + return event + + @pytest.mark.asyncio + async def test_exec_command_returns_output(self): + from gateway.run import GatewayRunner + runner = GatewayRunner.__new__(GatewayRunner) + runner.config = {"quick_commands": {"limits": {"type": "exec", "command": "echo ok"}}} + runner._running_agents = {} + runner._pending_messages = {} + runner._is_user_authorized = MagicMock(return_value=True) + + event = self._make_event("limits") + result = await runner._handle_message(event) + assert result == "ok" + + @pytest.mark.asyncio + async def test_unsupported_type_returns_error(self): + from gateway.run import GatewayRunner + runner = GatewayRunner.__new__(GatewayRunner) + runner.config = {"quick_commands": {"bad": {"type": "prompt", "command": "echo hi"}}} + runner._running_agents = {} + runner._pending_messages = {} + runner._is_user_authorized = MagicMock(return_value=True) + + event = self._make_event("bad") + result = await runner._handle_message(event) + assert result is not None + assert "unsupported type" in result.lower() + + @pytest.mark.asyncio + async def test_timeout_returns_error(self): + from gateway.run import GatewayRunner + import asyncio + runner = GatewayRunner.__new__(GatewayRunner) + runner.config = {"quick_commands": {"slow": {"type": "exec", "command": "sleep 100"}}} + runner._running_agents = {} + runner._pending_messages = {} + runner._is_user_authorized = MagicMock(return_value=True) + + event = self._make_event("slow") + with patch("asyncio.wait_for", side_effect=asyncio.TimeoutError): + result = await runner._handle_message(event) + assert result is not None + assert "timed out" in result.lower()