From 31b4721791aa163c80b5f78a7fb2f1fb3530d434 Mon Sep 17 00:00:00 2001 From: "mr.Shu" Date: Wed, 8 Apr 2026 15:20:24 +0200 Subject: [PATCH] fix: simplify ACP approval bridging Previously ACP dangerous-command approvals mixed an invalid ACP payload shape with partial Hermes option mapping, and the callback plumbing was shared across worker threads. This commit uses ACP tool-call updates, preserves Hermes once/session/always semantics, and scopes approval callbacks to the current worker thread. - Build permission requests with `update_tool_call` and unique `perm-check-*` ids in `acp_adapter/permissions.py` - Keep ACP option mapping explicit and fail closed on unknown outcomes or request failures - Set approval callbacks inside the ACP executor worker and read them from thread-local state in `tools/terminal_tool.py` - Replace duplicated ACP bridge coverage with focused tests in `tests/acp/test_permissions.py` and add a thread-local callback test --- acp_adapter/permissions.py | 127 +++++++++++++++++------- tests/acp/test_permissions.py | 175 ++++++++++++++++++++++++---------- 2 files changed, 221 insertions(+), 81 deletions(-) diff --git a/acp_adapter/permissions.py b/acp_adapter/permissions.py index c2e1a598269..44aead28742 100644 --- a/acp_adapter/permissions.py +++ b/acp_adapter/permissions.py @@ -1,10 +1,11 @@ -"""ACP permission bridging — maps ACP approval requests to hermes approval callbacks.""" +"""ACP permission bridging for Hermes dangerous-command approvals.""" from __future__ import annotations import asyncio import logging from concurrent.futures import TimeoutError as FutureTimeout +from itertools import count from typing import Callable from acp.schema import ( @@ -14,24 +15,87 @@ from acp.schema import ( logger = logging.getLogger(__name__) -# Maps ACP PermissionOptionKind -> hermes approval result strings -_KIND_TO_HERMES = { +# Maps ACP permission option ids to Hermes approval result strings. +# Option ids are stable across both the ``allow_permanent=True`` and +# ``allow_permanent=False`` paths even though the option list differs. +_OPTION_ID_TO_HERMES = { "allow_once": "once", + "allow_session": "session", "allow_always": "always", - "reject_once": "deny", - "reject_always": "deny", + "deny": "deny", } +_PERMISSION_REQUEST_IDS = count(1) + + +def _build_permission_options(*, allow_permanent: bool) -> list[PermissionOption]: + """Return ACP options that match Hermes approval semantics.""" + options = [ + PermissionOption(option_id="allow_once", kind="allow_once", name="Allow once"), + PermissionOption( + option_id="allow_session", + # ACP has no session-scoped kind, so use the closest persistent + # hint while keeping Hermes semantics in the option id. + kind="allow_always", + name="Allow for session", + ), + ] + if allow_permanent: + options.append( + PermissionOption( + option_id="allow_always", + kind="allow_always", + name="Allow always", + ), + ) + options.append(PermissionOption(option_id="deny", kind="reject_once", name="Deny")) + return options + + +def _build_permission_tool_call(command: str, description: str): + """Return the ACP tool-call update attached to a permission request. + + ``request_permission`` expects a ``ToolCallUpdate`` payload — produced + by ``_acp.update_tool_call`` — not a ``ToolCallStart``. Each request + gets a unique ``perm-check-N`` id so concurrent requests don't collide. + """ + import acp as _acp + + tool_call_id = f"perm-check-{next(_PERMISSION_REQUEST_IDS)}" + return _acp.update_tool_call( + tool_call_id, + title=description, + kind="execute", + status="pending", + content=[_acp.tool_content(_acp.text_block(f"$ {command}"))], + raw_input={"command": command, "description": description}, + ) + + +def _map_outcome_to_hermes(outcome: object, *, allowed_option_ids: set[str]) -> str: + """Map an ACP permission outcome into Hermes approval strings.""" + if not isinstance(outcome, AllowedOutcome): + return "deny" + + option_id = outcome.option_id + if option_id not in allowed_option_ids: + logger.warning("Permission request returned unknown option_id: %s", option_id) + return "deny" + return _OPTION_ID_TO_HERMES.get(option_id, "deny") + def make_approval_callback( request_permission_fn: Callable, loop: asyncio.AbstractEventLoop, session_id: str, timeout: float = 60.0, -) -> Callable[[str, str], str]: +) -> Callable[..., str]: """ - Return a hermes-compatible ``approval_callback(command, description) -> str`` - that bridges to the ACP client's ``request_permission`` call. + Return a Hermes-compatible approval callback that bridges to ACP. + + The callback accepts ``command`` and ``description`` plus optional + keyword arguments such as ``allow_permanent`` used by + ``tools.approval.prompt_dangerous_approval()``. Args: request_permission_fn: The ACP connection's ``request_permission`` coroutine. @@ -40,41 +104,38 @@ def make_approval_callback( timeout: Seconds to wait for a response before auto-denying. """ - def _callback(command: str, description: str) -> str: - options = [ - PermissionOption(option_id="allow_once", kind="allow_once", name="Allow once"), - PermissionOption(option_id="allow_always", kind="allow_always", name="Allow always"), - PermissionOption(option_id="deny", kind="reject_once", name="Deny"), - ] - import acp as _acp - - tool_call = _acp.start_tool_call("perm-check", command, kind="execute") - - coro = request_permission_fn( - session_id=session_id, - tool_call=tool_call, - options=options, - ) + def _callback( + command: str, + description: str, + *, + allow_permanent: bool = True, + **_: object, + ) -> str: + options = _build_permission_options(allow_permanent=allow_permanent) + future = None try: + tool_call = _build_permission_tool_call(command, description) + coro = request_permission_fn( + session_id=session_id, + tool_call=tool_call, + options=options, + ) future = asyncio.run_coroutine_threadsafe(coro, loop) response = future.result(timeout=timeout) except (FutureTimeout, Exception) as exc: + if future is not None: + future.cancel() logger.warning("Permission request timed out or failed: %s", exc) return "deny" if response is None: return "deny" - outcome = response.outcome - if isinstance(outcome, AllowedOutcome): - option_id = outcome.option_id - # Look up the kind from our options list - for opt in options: - if opt.option_id == option_id: - return _KIND_TO_HERMES.get(opt.kind, "deny") - return "once" # fallback for unknown option_id - else: - return "deny" + allowed_option_ids = {option.option_id for option in options} + return _map_outcome_to_hermes( + response.outcome, + allowed_option_ids=allowed_option_ids, + ) return _callback diff --git a/tests/acp/test_permissions.py b/tests/acp/test_permissions.py index 57e2bd4e5b9..8bbdeeb392a 100644 --- a/tests/acp/test_permissions.py +++ b/tests/acp/test_permissions.py @@ -1,89 +1,168 @@ -"""Tests for acp_adapter.permissions — ACP approval bridging.""" +"""Tests for acp_adapter.permissions.""" import asyncio +import inspect from concurrent.futures import Future -from unittest.mock import MagicMock, patch - -import pytest +from unittest.mock import AsyncMock, MagicMock, patch from acp.schema import ( AllowedOutcome, DeniedOutcome, RequestPermissionResponse, ) + from acp_adapter.permissions import make_approval_callback +from tools.approval import prompt_dangerous_approval def _make_response(outcome): - """Helper to build a RequestPermissionResponse with the given outcome.""" return RequestPermissionResponse(outcome=outcome) -def _setup_callback(outcome, timeout=60.0): - """ - Create a callback wired to a mock request_permission coroutine - that resolves to the given outcome. - - Returns: - (callback, mock_request_permission_fn) - """ +def _invoke_callback( + outcome, + *, + allow_permanent=True, + timeout=60.0, + use_prompt_path=False, +): loop = MagicMock(spec=asyncio.AbstractEventLoop) - mock_rp = MagicMock(name="request_permission") - - response = _make_response(outcome) - - # Patch asyncio.run_coroutine_threadsafe so it returns a future - # that immediately yields the response. + request_permission = AsyncMock(name="request_permission") future = MagicMock(spec=Future) - future.result.return_value = response + future.result.return_value = _make_response(outcome) - with patch("acp_adapter.permissions.asyncio.run_coroutine_threadsafe", return_value=future): - cb = make_approval_callback(mock_rp, loop, session_id="s1", timeout=timeout) - result = cb("rm -rf /", "dangerous command") + scheduled = {} - return result + def _schedule(coro, passed_loop): + scheduled["coro"] = coro + scheduled["loop"] = passed_loop + return future + + with patch("acp_adapter.permissions.asyncio.run_coroutine_threadsafe", side_effect=_schedule): + cb = make_approval_callback(request_permission, loop, session_id="s1", timeout=timeout) + if use_prompt_path: + result = prompt_dangerous_approval( + "rm -rf /", + "dangerous command", + allow_permanent=allow_permanent, + approval_callback=cb, + ) + else: + result = cb( + "rm -rf /", + "dangerous command", + allow_permanent=allow_permanent, + ) + + scheduled["coro"].close() + _, kwargs = request_permission.call_args + return result, kwargs, scheduled, future, loop -class TestApprovalMapping: - def test_approval_allow_once_maps_correctly(self): - outcome = AllowedOutcome(option_id="allow_once", outcome="selected") - result = _setup_callback(outcome) +class TestApprovalBridge: + def test_bridge_schedules_request_on_the_given_loop(self): + result, kwargs, scheduled, _, loop = _invoke_callback( + AllowedOutcome(option_id="allow_once", outcome="selected"), + ) + + tool_call = kwargs["tool_call"] + option_ids = [option.option_id for option in kwargs["options"]] + assert result == "once" + assert scheduled["loop"] is loop + assert inspect.iscoroutine(scheduled["coro"]) + assert kwargs["session_id"] == "s1" + assert tool_call.session_update == "tool_call_update" + assert tool_call.tool_call_id.startswith("perm-check-") + assert tool_call.kind == "execute" + assert tool_call.status == "pending" + assert tool_call.title == "dangerous command" + assert tool_call.raw_input == { + "command": "rm -rf /", + "description": "dangerous command", + } + assert option_ids == ["allow_once", "allow_session", "allow_always", "deny"] + + def test_tool_call_ids_are_unique(self): + _, first_kwargs, _, _, _ = _invoke_callback( + AllowedOutcome(option_id="allow_once", outcome="selected"), + ) + _, second_kwargs, _, _, _ = _invoke_callback( + AllowedOutcome(option_id="allow_once", outcome="selected"), + ) + + assert first_kwargs["tool_call"].tool_call_id != second_kwargs["tool_call"].tool_call_id + + def test_prompt_path_keeps_session_option_when_permanent_disabled(self): + result, kwargs, _, _, _ = _invoke_callback( + AllowedOutcome(option_id="allow_session", outcome="selected"), + allow_permanent=False, + use_prompt_path=True, + ) + + option_ids = [option.option_id for option in kwargs["options"]] + + assert result == "session" + assert option_ids == ["allow_once", "allow_session", "deny"] + + def test_allow_always_maps_correctly(self): + result, _, _, _, _ = _invoke_callback( + AllowedOutcome(option_id="allow_always", outcome="selected"), + use_prompt_path=True, + ) - def test_approval_allow_always_maps_correctly(self): - outcome = AllowedOutcome(option_id="allow_always", outcome="selected") - result = _setup_callback(outcome) assert result == "always" - def test_approval_deny_maps_correctly(self): - outcome = DeniedOutcome(outcome="cancelled") - result = _setup_callback(outcome) - assert result == "deny" + def test_denied_and_unknown_outcomes_deny(self): + denied_result, _, _, _, _ = _invoke_callback(DeniedOutcome(outcome="cancelled")) + unknown_result, _, _, _, _ = _invoke_callback( + AllowedOutcome(option_id="unexpected", outcome="selected"), + ) - def test_approval_timeout_returns_deny(self): - """When the future times out, the callback should return 'deny'.""" + assert denied_result == "deny" + assert unknown_result == "deny" + + def test_timeout_returns_deny_and_cancels_future(self): loop = MagicMock(spec=asyncio.AbstractEventLoop) - mock_rp = MagicMock(name="request_permission") - + request_permission = AsyncMock(name="request_permission") future = MagicMock(spec=Future) future.result.side_effect = TimeoutError("timed out") - with patch("acp_adapter.permissions.asyncio.run_coroutine_threadsafe", return_value=future): - cb = make_approval_callback(mock_rp, loop, session_id="s1", timeout=0.01) - result = cb("rm -rf /", "dangerous") + scheduled = {} + + def _schedule(coro, passed_loop): + scheduled["coro"] = coro + scheduled["loop"] = passed_loop + return future + + with patch("acp_adapter.permissions.asyncio.run_coroutine_threadsafe", side_effect=_schedule): + cb = make_approval_callback(request_permission, loop, session_id="s1", timeout=0.01) + result = cb("rm -rf /", "dangerous command") + + scheduled["coro"].close() assert result == "deny" + assert scheduled["loop"] is loop + assert future.cancel.call_count == 1 - def test_approval_none_response_returns_deny(self): - """When request_permission resolves to None, the callback should return 'deny'.""" + def test_none_response_returns_deny(self): + """When request_permission resolves to None, the callback returns 'deny'.""" loop = MagicMock(spec=asyncio.AbstractEventLoop) - mock_rp = MagicMock(name="request_permission") - + request_permission = AsyncMock(name="request_permission") future = MagicMock(spec=Future) future.result.return_value = None - with patch("acp_adapter.permissions.asyncio.run_coroutine_threadsafe", return_value=future): - cb = make_approval_callback(mock_rp, loop, session_id="s1", timeout=1.0) + scheduled = {} + + def _schedule(coro, passed_loop): + scheduled["coro"] = coro + scheduled["loop"] = passed_loop + return future + + with patch("acp_adapter.permissions.asyncio.run_coroutine_threadsafe", side_effect=_schedule): + cb = make_approval_callback(request_permission, loop, session_id="s1", timeout=1.0) result = cb("echo hi", "demo") + scheduled["coro"].close() + assert result == "deny"