mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
143 lines
5.1 KiB
Python
143 lines
5.1 KiB
Python
import queue
|
|
import threading
|
|
import time
|
|
from types import SimpleNamespace
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import cli as cli_module
|
|
from cli import HermesCLI
|
|
|
|
|
|
class _FakeBuffer:
|
|
def __init__(self, text="", cursor_position=None):
|
|
self.text = text
|
|
self.cursor_position = len(text) if cursor_position is None else cursor_position
|
|
|
|
def reset(self, append_to_history=False):
|
|
self.text = ""
|
|
self.cursor_position = 0
|
|
|
|
|
|
def _make_cli_stub():
|
|
cli = HermesCLI.__new__(HermesCLI)
|
|
cli._approval_state = None
|
|
cli._approval_deadline = 0
|
|
cli._approval_lock = threading.Lock()
|
|
cli._sudo_state = None
|
|
cli._sudo_deadline = 0
|
|
cli._modal_input_snapshot = None
|
|
cli._invalidate = MagicMock()
|
|
cli._app = SimpleNamespace(invalidate=MagicMock(), current_buffer=_FakeBuffer())
|
|
return cli
|
|
|
|
|
|
class TestCliApprovalUi:
|
|
def test_sudo_prompt_restores_existing_draft_after_response(self):
|
|
cli = _make_cli_stub()
|
|
cli._app.current_buffer = _FakeBuffer("draft command", cursor_position=5)
|
|
result = {}
|
|
|
|
def _run_callback():
|
|
result["value"] = cli._sudo_password_callback()
|
|
|
|
with patch.object(cli_module, "_cprint"):
|
|
thread = threading.Thread(target=_run_callback, daemon=True)
|
|
thread.start()
|
|
|
|
deadline = time.time() + 2
|
|
while cli._sudo_state is None and time.time() < deadline:
|
|
time.sleep(0.01)
|
|
|
|
assert cli._sudo_state is not None
|
|
assert cli._app.current_buffer.text == ""
|
|
|
|
cli._app.current_buffer.text = "secret"
|
|
cli._app.current_buffer.cursor_position = len("secret")
|
|
cli._sudo_state["response_queue"].put("secret")
|
|
|
|
thread.join(timeout=2)
|
|
|
|
assert result["value"] == "secret"
|
|
assert cli._app.current_buffer.text == "draft command"
|
|
assert cli._app.current_buffer.cursor_position == 5
|
|
|
|
def test_approval_callback_includes_view_for_long_commands(self):
|
|
cli = _make_cli_stub()
|
|
command = "sudo dd if=/tmp/githubcli-keyring.gpg of=/usr/share/keyrings/githubcli-archive-keyring.gpg bs=4M status=progress"
|
|
result = {}
|
|
|
|
def _run_callback():
|
|
result["value"] = cli._approval_callback(command, "disk copy")
|
|
|
|
thread = threading.Thread(target=_run_callback, daemon=True)
|
|
thread.start()
|
|
|
|
deadline = time.time() + 2
|
|
while cli._approval_state is None and time.time() < deadline:
|
|
time.sleep(0.01)
|
|
|
|
assert cli._approval_state is not None
|
|
assert "view" in cli._approval_state["choices"]
|
|
|
|
cli._approval_state["response_queue"].put("deny")
|
|
thread.join(timeout=2)
|
|
assert result["value"] == "deny"
|
|
|
|
def test_handle_approval_selection_view_expands_in_place(self):
|
|
cli = _make_cli_stub()
|
|
cli._approval_state = {
|
|
"command": "sudo dd if=/tmp/in of=/usr/share/keyrings/githubcli-archive-keyring.gpg bs=4M status=progress",
|
|
"description": "disk copy",
|
|
"choices": ["once", "session", "always", "deny", "view"],
|
|
"selected": 4,
|
|
"response_queue": queue.Queue(),
|
|
}
|
|
|
|
cli._handle_approval_selection()
|
|
|
|
assert cli._approval_state is not None
|
|
assert cli._approval_state["show_full"] is True
|
|
assert "view" not in cli._approval_state["choices"]
|
|
assert cli._approval_state["selected"] == 3
|
|
assert cli._approval_state["response_queue"].empty()
|
|
|
|
def test_approval_display_places_title_inside_box_not_border(self):
|
|
cli = _make_cli_stub()
|
|
cli._approval_state = {
|
|
"command": "sudo dd if=/tmp/in of=/usr/share/keyrings/githubcli-archive-keyring.gpg bs=4M status=progress",
|
|
"description": "disk copy",
|
|
"choices": ["once", "session", "always", "deny", "view"],
|
|
"selected": 0,
|
|
"response_queue": queue.Queue(),
|
|
}
|
|
|
|
fragments = cli._get_approval_display_fragments()
|
|
rendered = "".join(text for _style, text in fragments)
|
|
lines = rendered.splitlines()
|
|
|
|
assert lines[0].startswith("╭")
|
|
assert "Dangerous Command" not in lines[0]
|
|
assert any("Dangerous Command" in line for line in lines[1:3])
|
|
assert "Show full command" in rendered
|
|
assert "githubcli-archive-keyring.gpg" not in rendered
|
|
|
|
def test_approval_display_shows_full_command_after_view(self):
|
|
cli = _make_cli_stub()
|
|
full_command = "sudo dd if=/tmp/in of=/usr/share/keyrings/githubcli-archive-keyring.gpg bs=4M status=progress"
|
|
cli._approval_state = {
|
|
"command": full_command,
|
|
"description": "disk copy",
|
|
"choices": ["once", "session", "always", "deny"],
|
|
"selected": 0,
|
|
"show_full": True,
|
|
"response_queue": queue.Queue(),
|
|
}
|
|
|
|
fragments = cli._get_approval_display_fragments()
|
|
rendered = "".join(text for _style, text in fragments)
|
|
|
|
assert "..." not in rendered
|
|
assert "githubcli-" in rendered
|
|
assert "archive-" in rendered
|
|
assert "keyring.gpg" in rendered
|
|
assert "status=progress" in rendered
|