fix: simplify ACP approval bridging

Previously ACP dangerous-command approvals mixed an invalid ACP
payload shape with partial Hermes option mapping, and the callback
plumbing was shared across worker threads. This commit uses ACP
tool-call updates, preserves Hermes once/session/always semantics,
and scopes approval callbacks to the current worker thread.

- Build permission requests with `update_tool_call` and unique
  `perm-check-*` ids in `acp_adapter/permissions.py`
- Keep ACP option mapping explicit and fail closed on unknown outcomes
  or request failures
- Set approval callbacks inside the ACP executor worker and read them
  from thread-local state in `tools/terminal_tool.py`
- Replace duplicated ACP bridge coverage with focused tests in
  `tests/acp/test_permissions.py` and add a thread-local callback test
This commit is contained in:
mr.Shu 2026-04-08 15:20:24 +02:00 committed by Teknium
parent 35ce94a2f8
commit 31b4721791
2 changed files with 221 additions and 81 deletions

View file

@ -1,10 +1,11 @@
"""ACP permission bridging — maps ACP approval requests to hermes approval callbacks."""
"""ACP permission bridging for Hermes dangerous-command approvals."""
from __future__ import annotations
import asyncio
import logging
from concurrent.futures import TimeoutError as FutureTimeout
from itertools import count
from typing import Callable
from acp.schema import (
@ -14,24 +15,87 @@ from acp.schema import (
logger = logging.getLogger(__name__)
# Maps ACP PermissionOptionKind -> hermes approval result strings
_KIND_TO_HERMES = {
# Maps ACP permission option ids to Hermes approval result strings.
# Option ids are stable across both the ``allow_permanent=True`` and
# ``allow_permanent=False`` paths even though the option list differs.
_OPTION_ID_TO_HERMES = {
"allow_once": "once",
"allow_session": "session",
"allow_always": "always",
"reject_once": "deny",
"reject_always": "deny",
"deny": "deny",
}
_PERMISSION_REQUEST_IDS = count(1)
def _build_permission_options(*, allow_permanent: bool) -> list[PermissionOption]:
"""Return ACP options that match Hermes approval semantics."""
options = [
PermissionOption(option_id="allow_once", kind="allow_once", name="Allow once"),
PermissionOption(
option_id="allow_session",
# ACP has no session-scoped kind, so use the closest persistent
# hint while keeping Hermes semantics in the option id.
kind="allow_always",
name="Allow for session",
),
]
if allow_permanent:
options.append(
PermissionOption(
option_id="allow_always",
kind="allow_always",
name="Allow always",
),
)
options.append(PermissionOption(option_id="deny", kind="reject_once", name="Deny"))
return options
def _build_permission_tool_call(command: str, description: str):
"""Return the ACP tool-call update attached to a permission request.
``request_permission`` expects a ``ToolCallUpdate`` payload produced
by ``_acp.update_tool_call`` not a ``ToolCallStart``. Each request
gets a unique ``perm-check-N`` id so concurrent requests don't collide.
"""
import acp as _acp
tool_call_id = f"perm-check-{next(_PERMISSION_REQUEST_IDS)}"
return _acp.update_tool_call(
tool_call_id,
title=description,
kind="execute",
status="pending",
content=[_acp.tool_content(_acp.text_block(f"$ {command}"))],
raw_input={"command": command, "description": description},
)
def _map_outcome_to_hermes(outcome: object, *, allowed_option_ids: set[str]) -> str:
"""Map an ACP permission outcome into Hermes approval strings."""
if not isinstance(outcome, AllowedOutcome):
return "deny"
option_id = outcome.option_id
if option_id not in allowed_option_ids:
logger.warning("Permission request returned unknown option_id: %s", option_id)
return "deny"
return _OPTION_ID_TO_HERMES.get(option_id, "deny")
def make_approval_callback(
request_permission_fn: Callable,
loop: asyncio.AbstractEventLoop,
session_id: str,
timeout: float = 60.0,
) -> Callable[[str, str], str]:
) -> Callable[..., str]:
"""
Return a hermes-compatible ``approval_callback(command, description) -> str``
that bridges to the ACP client's ``request_permission`` call.
Return a Hermes-compatible approval callback that bridges to ACP.
The callback accepts ``command`` and ``description`` plus optional
keyword arguments such as ``allow_permanent`` used by
``tools.approval.prompt_dangerous_approval()``.
Args:
request_permission_fn: The ACP connection's ``request_permission`` coroutine.
@ -40,41 +104,38 @@ def make_approval_callback(
timeout: Seconds to wait for a response before auto-denying.
"""
def _callback(command: str, description: str) -> str:
options = [
PermissionOption(option_id="allow_once", kind="allow_once", name="Allow once"),
PermissionOption(option_id="allow_always", kind="allow_always", name="Allow always"),
PermissionOption(option_id="deny", kind="reject_once", name="Deny"),
]
import acp as _acp
tool_call = _acp.start_tool_call("perm-check", command, kind="execute")
coro = request_permission_fn(
session_id=session_id,
tool_call=tool_call,
options=options,
)
def _callback(
command: str,
description: str,
*,
allow_permanent: bool = True,
**_: object,
) -> str:
options = _build_permission_options(allow_permanent=allow_permanent)
future = None
try:
tool_call = _build_permission_tool_call(command, description)
coro = request_permission_fn(
session_id=session_id,
tool_call=tool_call,
options=options,
)
future = asyncio.run_coroutine_threadsafe(coro, loop)
response = future.result(timeout=timeout)
except (FutureTimeout, Exception) as exc:
if future is not None:
future.cancel()
logger.warning("Permission request timed out or failed: %s", exc)
return "deny"
if response is None:
return "deny"
outcome = response.outcome
if isinstance(outcome, AllowedOutcome):
option_id = outcome.option_id
# Look up the kind from our options list
for opt in options:
if opt.option_id == option_id:
return _KIND_TO_HERMES.get(opt.kind, "deny")
return "once" # fallback for unknown option_id
else:
return "deny"
allowed_option_ids = {option.option_id for option in options}
return _map_outcome_to_hermes(
response.outcome,
allowed_option_ids=allowed_option_ids,
)
return _callback

View file

@ -1,89 +1,168 @@
"""Tests for acp_adapter.permissions — ACP approval bridging."""
"""Tests for acp_adapter.permissions."""
import asyncio
import inspect
from concurrent.futures import Future
from unittest.mock import MagicMock, patch
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from acp.schema import (
AllowedOutcome,
DeniedOutcome,
RequestPermissionResponse,
)
from acp_adapter.permissions import make_approval_callback
from tools.approval import prompt_dangerous_approval
def _make_response(outcome):
"""Helper to build a RequestPermissionResponse with the given outcome."""
return RequestPermissionResponse(outcome=outcome)
def _setup_callback(outcome, timeout=60.0):
"""
Create a callback wired to a mock request_permission coroutine
that resolves to the given outcome.
Returns:
(callback, mock_request_permission_fn)
"""
def _invoke_callback(
outcome,
*,
allow_permanent=True,
timeout=60.0,
use_prompt_path=False,
):
loop = MagicMock(spec=asyncio.AbstractEventLoop)
mock_rp = MagicMock(name="request_permission")
response = _make_response(outcome)
# Patch asyncio.run_coroutine_threadsafe so it returns a future
# that immediately yields the response.
request_permission = AsyncMock(name="request_permission")
future = MagicMock(spec=Future)
future.result.return_value = response
future.result.return_value = _make_response(outcome)
with patch("acp_adapter.permissions.asyncio.run_coroutine_threadsafe", return_value=future):
cb = make_approval_callback(mock_rp, loop, session_id="s1", timeout=timeout)
result = cb("rm -rf /", "dangerous command")
scheduled = {}
return result
def _schedule(coro, passed_loop):
scheduled["coro"] = coro
scheduled["loop"] = passed_loop
return future
with patch("acp_adapter.permissions.asyncio.run_coroutine_threadsafe", side_effect=_schedule):
cb = make_approval_callback(request_permission, loop, session_id="s1", timeout=timeout)
if use_prompt_path:
result = prompt_dangerous_approval(
"rm -rf /",
"dangerous command",
allow_permanent=allow_permanent,
approval_callback=cb,
)
else:
result = cb(
"rm -rf /",
"dangerous command",
allow_permanent=allow_permanent,
)
scheduled["coro"].close()
_, kwargs = request_permission.call_args
return result, kwargs, scheduled, future, loop
class TestApprovalMapping:
def test_approval_allow_once_maps_correctly(self):
outcome = AllowedOutcome(option_id="allow_once", outcome="selected")
result = _setup_callback(outcome)
class TestApprovalBridge:
def test_bridge_schedules_request_on_the_given_loop(self):
result, kwargs, scheduled, _, loop = _invoke_callback(
AllowedOutcome(option_id="allow_once", outcome="selected"),
)
tool_call = kwargs["tool_call"]
option_ids = [option.option_id for option in kwargs["options"]]
assert result == "once"
assert scheduled["loop"] is loop
assert inspect.iscoroutine(scheduled["coro"])
assert kwargs["session_id"] == "s1"
assert tool_call.session_update == "tool_call_update"
assert tool_call.tool_call_id.startswith("perm-check-")
assert tool_call.kind == "execute"
assert tool_call.status == "pending"
assert tool_call.title == "dangerous command"
assert tool_call.raw_input == {
"command": "rm -rf /",
"description": "dangerous command",
}
assert option_ids == ["allow_once", "allow_session", "allow_always", "deny"]
def test_tool_call_ids_are_unique(self):
_, first_kwargs, _, _, _ = _invoke_callback(
AllowedOutcome(option_id="allow_once", outcome="selected"),
)
_, second_kwargs, _, _, _ = _invoke_callback(
AllowedOutcome(option_id="allow_once", outcome="selected"),
)
assert first_kwargs["tool_call"].tool_call_id != second_kwargs["tool_call"].tool_call_id
def test_prompt_path_keeps_session_option_when_permanent_disabled(self):
result, kwargs, _, _, _ = _invoke_callback(
AllowedOutcome(option_id="allow_session", outcome="selected"),
allow_permanent=False,
use_prompt_path=True,
)
option_ids = [option.option_id for option in kwargs["options"]]
assert result == "session"
assert option_ids == ["allow_once", "allow_session", "deny"]
def test_allow_always_maps_correctly(self):
result, _, _, _, _ = _invoke_callback(
AllowedOutcome(option_id="allow_always", outcome="selected"),
use_prompt_path=True,
)
def test_approval_allow_always_maps_correctly(self):
outcome = AllowedOutcome(option_id="allow_always", outcome="selected")
result = _setup_callback(outcome)
assert result == "always"
def test_approval_deny_maps_correctly(self):
outcome = DeniedOutcome(outcome="cancelled")
result = _setup_callback(outcome)
assert result == "deny"
def test_denied_and_unknown_outcomes_deny(self):
denied_result, _, _, _, _ = _invoke_callback(DeniedOutcome(outcome="cancelled"))
unknown_result, _, _, _, _ = _invoke_callback(
AllowedOutcome(option_id="unexpected", outcome="selected"),
)
def test_approval_timeout_returns_deny(self):
"""When the future times out, the callback should return 'deny'."""
assert denied_result == "deny"
assert unknown_result == "deny"
def test_timeout_returns_deny_and_cancels_future(self):
loop = MagicMock(spec=asyncio.AbstractEventLoop)
mock_rp = MagicMock(name="request_permission")
request_permission = AsyncMock(name="request_permission")
future = MagicMock(spec=Future)
future.result.side_effect = TimeoutError("timed out")
with patch("acp_adapter.permissions.asyncio.run_coroutine_threadsafe", return_value=future):
cb = make_approval_callback(mock_rp, loop, session_id="s1", timeout=0.01)
result = cb("rm -rf /", "dangerous")
scheduled = {}
def _schedule(coro, passed_loop):
scheduled["coro"] = coro
scheduled["loop"] = passed_loop
return future
with patch("acp_adapter.permissions.asyncio.run_coroutine_threadsafe", side_effect=_schedule):
cb = make_approval_callback(request_permission, loop, session_id="s1", timeout=0.01)
result = cb("rm -rf /", "dangerous command")
scheduled["coro"].close()
assert result == "deny"
assert scheduled["loop"] is loop
assert future.cancel.call_count == 1
def test_approval_none_response_returns_deny(self):
"""When request_permission resolves to None, the callback should return 'deny'."""
def test_none_response_returns_deny(self):
"""When request_permission resolves to None, the callback returns 'deny'."""
loop = MagicMock(spec=asyncio.AbstractEventLoop)
mock_rp = MagicMock(name="request_permission")
request_permission = AsyncMock(name="request_permission")
future = MagicMock(spec=Future)
future.result.return_value = None
with patch("acp_adapter.permissions.asyncio.run_coroutine_threadsafe", return_value=future):
cb = make_approval_callback(mock_rp, loop, session_id="s1", timeout=1.0)
scheduled = {}
def _schedule(coro, passed_loop):
scheduled["coro"] = coro
scheduled["loop"] = passed_loop
return future
with patch("acp_adapter.permissions.asyncio.run_coroutine_threadsafe", side_effect=_schedule):
cb = make_approval_callback(request_permission, loop, session_id="s1", timeout=1.0)
result = cb("echo hi", "demo")
scheduled["coro"].close()
assert result == "deny"