feat(acp): add session-scoped edit auto-approval

This commit is contained in:
HenkDz 2026-05-16 12:06:21 +01:00 committed by Teknium
parent 49b28d1646
commit f70e0b85dd
4 changed files with 166 additions and 5 deletions

View file

@ -40,6 +40,12 @@ _EDIT_APPROVAL_REQUESTER: ContextVar[EditApprovalRequester | None] = ContextVar(
_PERMISSION_REQUEST_IDS = count(1)
SENSITIVE_AUTO_APPROVE_NAMES = {".env", ".env.local", ".env.production", "id_rsa", "id_ed25519"}
AUTO_APPROVE_ASK = "ask"
AUTO_APPROVE_WORKSPACE = "workspace_session"
AUTO_APPROVE_SESSION = "session"
def set_edit_approval_requester(requester: EditApprovalRequester | None) -> Token:
"""Bind an ACP edit approval requester for the current context."""
@ -130,6 +136,40 @@ def build_edit_proposal(tool_name: str, arguments: dict[str, Any]) -> EditPropos
return None
def _is_sensitive_auto_approve_path(path: str) -> bool:
parts = Path(path).expanduser().parts
lowered = {part.lower() for part in parts}
if ".git" in lowered or ".ssh" in lowered:
return True
return Path(path).name.lower() in SENSITIVE_AUTO_APPROVE_NAMES
def should_auto_approve_edit(proposal: EditProposal, policy: str, cwd: str | None = None) -> bool:
"""Return whether an ACP edit proposal may bypass the prompt for this session.
This is intentionally session-scoped and conservative: sensitive paths still
ask even under autonomous policies.
"""
policy = str(policy or AUTO_APPROVE_ASK).strip()
if policy == AUTO_APPROVE_ASK or _is_sensitive_auto_approve_path(proposal.path):
return False
path = Path(proposal.path).expanduser().resolve(strict=False)
if policy == AUTO_APPROVE_SESSION:
return True
if policy == AUTO_APPROVE_WORKSPACE:
if str(path).startswith("/tmp/"):
return True
if cwd:
root = Path(cwd).expanduser().resolve(strict=False)
try:
path.relative_to(root)
return True
except ValueError:
return False
return False
def maybe_require_edit_approval(tool_name: str, arguments: dict[str, Any]) -> str | None:
"""Run ACP edit approval if bound.
@ -188,6 +228,7 @@ def make_acp_edit_approval_requester(
loop: asyncio.AbstractEventLoop,
session_id: str,
timeout: float = 60.0,
auto_approve_getter: Callable[[], tuple[str, str | None]] | None = None,
) -> EditApprovalRequester:
"""Return a sync requester that bridges edit proposals to ACP permissions."""
@ -195,6 +236,15 @@ def make_acp_edit_approval_requester(
from acp.schema import PermissionOption
from agent.async_utils import safe_schedule_threadsafe
if auto_approve_getter is not None:
try:
policy, cwd = auto_approve_getter()
if should_auto_approve_edit(proposal, policy, cwd):
logger.info("Auto-approved ACP edit under policy %s: %s", policy, proposal.path)
return True
except Exception:
logger.debug("ACP edit auto-approval policy check failed", exc_info=True)
options = [
PermissionOption(option_id="allow_once", kind="allow_once", name="Allow edit"),
PermissionOption(option_id="deny", kind="reject_once", name="Deny"),

View file

@ -45,6 +45,8 @@ from acp.schema import (
SetSessionModeResponse,
ResourceContentBlock,
SessionCapabilities,
SessionConfigOptionSelect,
SessionConfigSelectOption,
SessionForkCapabilities,
SessionListCapabilities,
SessionModelState,
@ -495,6 +497,9 @@ class HermesACPAgent(acp.Agent):
},
)
_EDIT_APPROVAL_POLICY_CONFIG_ID = "edit_approval_policy"
_EDIT_APPROVAL_POLICY_DEFAULT = "ask"
def __init__(self, session_manager: SessionManager | None = None):
super().__init__()
self.session_manager = session_manager or SessionManager()
@ -507,6 +512,49 @@ class HermesACPAgent(acp.Agent):
self._conn = conn
logger.info("ACP client connected")
def _session_config_options(self, state: SessionState) -> list[Any]:
values = getattr(state, "config_options", None)
if not isinstance(values, dict):
values = {}
current = str(values.get(self._EDIT_APPROVAL_POLICY_CONFIG_ID) or self._EDIT_APPROVAL_POLICY_DEFAULT)
allowed = {"ask", "workspace_session", "session"}
if current not in allowed:
current = self._EDIT_APPROVAL_POLICY_DEFAULT
return [
SessionConfigOptionSelect(
id=self._EDIT_APPROVAL_POLICY_CONFIG_ID,
name="Edit approvals",
description="Control ACP edit approvals for this session.",
category="permissions",
type="select",
current_value=current,
options=[
SessionConfigSelectOption(
value="ask",
name="Ask before edits",
description="Require approval for every file edit.",
),
SessionConfigSelectOption(
value="workspace_session",
name="Auto-allow workspace edits",
description="Allow workspace and /tmp edits for this session; still asks for sensitive paths.",
),
SessionConfigSelectOption(
value="session",
name="Auto-allow all edits this session",
description="Allow file edits for this session except sensitive paths.",
),
],
)
]
def _edit_approval_policy_for_state(self, state: SessionState) -> tuple[str, str | None]:
values = getattr(state, "config_options", None)
if not isinstance(values, dict):
values = {}
return str(values.get(self._EDIT_APPROVAL_POLICY_CONFIG_ID) or self._EDIT_APPROVAL_POLICY_DEFAULT), state.cwd
@staticmethod
def _encode_model_choice(provider: str | None, model: str | None) -> str:
"""Encode a model selection so ACP clients can keep provider context."""
@ -992,6 +1040,7 @@ class HermesACPAgent(acp.Agent):
return NewSessionResponse(
session_id=state.session_id,
models=self._build_model_state(state),
config_options=self._session_config_options(state),
)
async def load_session(
@ -1033,7 +1082,10 @@ class HermesACPAgent(acp.Agent):
)
self._schedule_available_commands_update(session_id)
self._schedule_usage_update(state)
return LoadSessionResponse(models=self._build_model_state(state))
return LoadSessionResponse(
models=self._build_model_state(state),
config_options=self._session_config_options(state),
)
async def resume_session(
self,
@ -1062,7 +1114,10 @@ class HermesACPAgent(acp.Agent):
)
self._schedule_available_commands_update(state.session_id)
self._schedule_usage_update(state)
return ResumeSessionResponse(models=self._build_model_state(state))
return ResumeSessionResponse(
models=self._build_model_state(state),
config_options=self._session_config_options(state),
)
async def cancel(self, session_id: str, **kwargs: Any) -> None:
state = self.session_manager.get_session(session_id)
@ -1092,7 +1147,11 @@ class HermesACPAgent(acp.Agent):
logger.info("Forked session %s -> %s", session_id, new_id)
if new_id:
self._schedule_available_commands_update(new_id)
return ForkSessionResponse(session_id=new_id)
return ForkSessionResponse(
session_id=new_id,
models=self._build_model_state(state) if state is not None else None,
config_options=self._session_config_options(state) if state is not None else None,
)
async def list_sessions(
self,
@ -1267,6 +1326,7 @@ class HermesACPAgent(acp.Agent):
conn.request_permission,
loop,
session_id,
auto_approve_getter=lambda: self._edit_approval_policy_for_state(state),
)
except Exception:
logger.debug("Could not create ACP edit approval requester", exc_info=True)
@ -1810,4 +1870,4 @@ class HermesACPAgent(acp.Agent):
setattr(state, "config_options", options)
self.session_manager.save_session(session_id)
logger.info("Session %s: config option %s updated", session_id, config_id)
return SetSessionConfigOptionResponse(config_options=[])
return SetSessionConfigOptionResponse(config_options=self._session_config_options(state))

View file

@ -3,12 +3,14 @@
from __future__ import annotations
import json
from pathlib import Path
from acp_adapter.edit_approval import (
EditProposal,
build_acp_edit_tool_call,
clear_edit_approval_requester,
set_edit_approval_requester,
should_auto_approve_edit,
)
from model_tools import handle_function_call
@ -177,3 +179,25 @@ def test_patch_replace_approval_request_includes_full_file_diff(tmp_path):
assert proposals[0].tool_name == "patch"
assert proposals[0].old_text == "alpha\nbeta\n"
assert proposals[0].new_text == "alpha\ngamma\n"
def test_workspace_auto_approval_allows_workspace_and_tmp_but_not_sensitive(tmp_path):
workspace_file = tmp_path / "src.py"
tmp_file = Path("/tmp/hermes-acp-auto-approve-test.txt")
env_file = tmp_path / ".env"
assert should_auto_approve_edit(
EditProposal("write_file", str(workspace_file), None, "x", {}),
"workspace_session",
str(tmp_path),
)
assert should_auto_approve_edit(
EditProposal("write_file", str(tmp_file), None, "x", {}),
"workspace_session",
str(tmp_path),
)
assert not should_auto_approve_edit(
EditProposal("write_file", str(env_file), None, "SECRET=x", {}),
"session",
str(tmp_path),
)

View file

@ -52,6 +52,32 @@ def agent(mock_manager):
"""HermesACPAgent backed by a mock session manager."""
return HermesACPAgent(session_manager=mock_manager)
@pytest.mark.asyncio
async def test_new_session_includes_edit_approval_config_option(self, agent):
resp = await agent.new_session(cwd="/tmp")
assert resp.config_options
option = resp.config_options[0]
assert option.id == "edit_approval_policy"
assert option.current_value == "ask"
assert {choice.value for choice in option.options} == {
"ask",
"workspace_session",
"session",
}
@pytest.mark.asyncio
async def test_set_config_option_persists_edit_approval_policy(self, agent):
resp = await agent.new_session(cwd="/tmp")
update = await agent.set_config_option(
"edit_approval_policy",
resp.session_id,
"workspace_session",
)
assert isinstance(update, SetSessionConfigOptionResponse)
assert update.config_options[0].current_value == "workspace_session"
# ---------------------------------------------------------------------------
# initialize
@ -892,7 +918,8 @@ class TestSessionConfiguration:
)
assert mode_result == {}
assert config_result == {"configOptions": []}
assert config_result["configOptions"]
assert config_result["configOptions"][0]["id"] == "edit_approval_policy"
@pytest.mark.asyncio
async def test_router_accepts_unstable_model_switch_when_enabled(self, agent):