diff --git a/acp_adapter/edit_approval.py b/acp_adapter/edit_approval.py index ebeab0bc7ec..7c5fcaefd22 100644 --- a/acp_adapter/edit_approval.py +++ b/acp_adapter/edit_approval.py @@ -40,6 +40,12 @@ _EDIT_APPROVAL_REQUESTER: ContextVar[EditApprovalRequester | None] = ContextVar( _PERMISSION_REQUEST_IDS = count(1) +SENSITIVE_AUTO_APPROVE_NAMES = {".env", ".env.local", ".env.production", "id_rsa", "id_ed25519"} +AUTO_APPROVE_ASK = "ask" +AUTO_APPROVE_WORKSPACE = "workspace_session" +AUTO_APPROVE_SESSION = "session" + + def set_edit_approval_requester(requester: EditApprovalRequester | None) -> Token: """Bind an ACP edit approval requester for the current context.""" @@ -130,6 +136,40 @@ def build_edit_proposal(tool_name: str, arguments: dict[str, Any]) -> EditPropos return None +def _is_sensitive_auto_approve_path(path: str) -> bool: + parts = Path(path).expanduser().parts + lowered = {part.lower() for part in parts} + if ".git" in lowered or ".ssh" in lowered: + return True + return Path(path).name.lower() in SENSITIVE_AUTO_APPROVE_NAMES + + +def should_auto_approve_edit(proposal: EditProposal, policy: str, cwd: str | None = None) -> bool: + """Return whether an ACP edit proposal may bypass the prompt for this session. + + This is intentionally session-scoped and conservative: sensitive paths still + ask even under autonomous policies. + """ + + policy = str(policy or AUTO_APPROVE_ASK).strip() + if policy == AUTO_APPROVE_ASK or _is_sensitive_auto_approve_path(proposal.path): + return False + path = Path(proposal.path).expanduser().resolve(strict=False) + if policy == AUTO_APPROVE_SESSION: + return True + if policy == AUTO_APPROVE_WORKSPACE: + if str(path).startswith("/tmp/"): + return True + if cwd: + root = Path(cwd).expanduser().resolve(strict=False) + try: + path.relative_to(root) + return True + except ValueError: + return False + return False + + def maybe_require_edit_approval(tool_name: str, arguments: dict[str, Any]) -> str | None: """Run ACP edit approval if bound. @@ -188,6 +228,7 @@ def make_acp_edit_approval_requester( loop: asyncio.AbstractEventLoop, session_id: str, timeout: float = 60.0, + auto_approve_getter: Callable[[], tuple[str, str | None]] | None = None, ) -> EditApprovalRequester: """Return a sync requester that bridges edit proposals to ACP permissions.""" @@ -195,6 +236,15 @@ def make_acp_edit_approval_requester( from acp.schema import PermissionOption from agent.async_utils import safe_schedule_threadsafe + if auto_approve_getter is not None: + try: + policy, cwd = auto_approve_getter() + if should_auto_approve_edit(proposal, policy, cwd): + logger.info("Auto-approved ACP edit under policy %s: %s", policy, proposal.path) + return True + except Exception: + logger.debug("ACP edit auto-approval policy check failed", exc_info=True) + options = [ PermissionOption(option_id="allow_once", kind="allow_once", name="Allow edit"), PermissionOption(option_id="deny", kind="reject_once", name="Deny"), diff --git a/acp_adapter/server.py b/acp_adapter/server.py index ebec969205c..62f8eafe6fc 100644 --- a/acp_adapter/server.py +++ b/acp_adapter/server.py @@ -45,6 +45,8 @@ from acp.schema import ( SetSessionModeResponse, ResourceContentBlock, SessionCapabilities, + SessionConfigOptionSelect, + SessionConfigSelectOption, SessionForkCapabilities, SessionListCapabilities, SessionModelState, @@ -495,6 +497,9 @@ class HermesACPAgent(acp.Agent): }, ) + _EDIT_APPROVAL_POLICY_CONFIG_ID = "edit_approval_policy" + _EDIT_APPROVAL_POLICY_DEFAULT = "ask" + def __init__(self, session_manager: SessionManager | None = None): super().__init__() self.session_manager = session_manager or SessionManager() @@ -507,6 +512,49 @@ class HermesACPAgent(acp.Agent): self._conn = conn logger.info("ACP client connected") + + def _session_config_options(self, state: SessionState) -> list[Any]: + values = getattr(state, "config_options", None) + if not isinstance(values, dict): + values = {} + current = str(values.get(self._EDIT_APPROVAL_POLICY_CONFIG_ID) or self._EDIT_APPROVAL_POLICY_DEFAULT) + allowed = {"ask", "workspace_session", "session"} + if current not in allowed: + current = self._EDIT_APPROVAL_POLICY_DEFAULT + return [ + SessionConfigOptionSelect( + id=self._EDIT_APPROVAL_POLICY_CONFIG_ID, + name="Edit approvals", + description="Control ACP edit approvals for this session.", + category="permissions", + type="select", + current_value=current, + options=[ + SessionConfigSelectOption( + value="ask", + name="Ask before edits", + description="Require approval for every file edit.", + ), + SessionConfigSelectOption( + value="workspace_session", + name="Auto-allow workspace edits", + description="Allow workspace and /tmp edits for this session; still asks for sensitive paths.", + ), + SessionConfigSelectOption( + value="session", + name="Auto-allow all edits this session", + description="Allow file edits for this session except sensitive paths.", + ), + ], + ) + ] + + def _edit_approval_policy_for_state(self, state: SessionState) -> tuple[str, str | None]: + values = getattr(state, "config_options", None) + if not isinstance(values, dict): + values = {} + return str(values.get(self._EDIT_APPROVAL_POLICY_CONFIG_ID) or self._EDIT_APPROVAL_POLICY_DEFAULT), state.cwd + @staticmethod def _encode_model_choice(provider: str | None, model: str | None) -> str: """Encode a model selection so ACP clients can keep provider context.""" @@ -992,6 +1040,7 @@ class HermesACPAgent(acp.Agent): return NewSessionResponse( session_id=state.session_id, models=self._build_model_state(state), + config_options=self._session_config_options(state), ) async def load_session( @@ -1033,7 +1082,10 @@ class HermesACPAgent(acp.Agent): ) self._schedule_available_commands_update(session_id) self._schedule_usage_update(state) - return LoadSessionResponse(models=self._build_model_state(state)) + return LoadSessionResponse( + models=self._build_model_state(state), + config_options=self._session_config_options(state), + ) async def resume_session( self, @@ -1062,7 +1114,10 @@ class HermesACPAgent(acp.Agent): ) self._schedule_available_commands_update(state.session_id) self._schedule_usage_update(state) - return ResumeSessionResponse(models=self._build_model_state(state)) + return ResumeSessionResponse( + models=self._build_model_state(state), + config_options=self._session_config_options(state), + ) async def cancel(self, session_id: str, **kwargs: Any) -> None: state = self.session_manager.get_session(session_id) @@ -1092,7 +1147,11 @@ class HermesACPAgent(acp.Agent): logger.info("Forked session %s -> %s", session_id, new_id) if new_id: self._schedule_available_commands_update(new_id) - return ForkSessionResponse(session_id=new_id) + return ForkSessionResponse( + session_id=new_id, + models=self._build_model_state(state) if state is not None else None, + config_options=self._session_config_options(state) if state is not None else None, + ) async def list_sessions( self, @@ -1267,6 +1326,7 @@ class HermesACPAgent(acp.Agent): conn.request_permission, loop, session_id, + auto_approve_getter=lambda: self._edit_approval_policy_for_state(state), ) except Exception: logger.debug("Could not create ACP edit approval requester", exc_info=True) @@ -1810,4 +1870,4 @@ class HermesACPAgent(acp.Agent): setattr(state, "config_options", options) self.session_manager.save_session(session_id) logger.info("Session %s: config option %s updated", session_id, config_id) - return SetSessionConfigOptionResponse(config_options=[]) + return SetSessionConfigOptionResponse(config_options=self._session_config_options(state)) diff --git a/tests/acp/test_edit_approval.py b/tests/acp/test_edit_approval.py index 2d68e22045b..1b6660c3b2f 100644 --- a/tests/acp/test_edit_approval.py +++ b/tests/acp/test_edit_approval.py @@ -3,12 +3,14 @@ from __future__ import annotations import json +from pathlib import Path from acp_adapter.edit_approval import ( EditProposal, build_acp_edit_tool_call, clear_edit_approval_requester, set_edit_approval_requester, + should_auto_approve_edit, ) from model_tools import handle_function_call @@ -177,3 +179,25 @@ def test_patch_replace_approval_request_includes_full_file_diff(tmp_path): assert proposals[0].tool_name == "patch" assert proposals[0].old_text == "alpha\nbeta\n" assert proposals[0].new_text == "alpha\ngamma\n" + + +def test_workspace_auto_approval_allows_workspace_and_tmp_but_not_sensitive(tmp_path): + workspace_file = tmp_path / "src.py" + tmp_file = Path("/tmp/hermes-acp-auto-approve-test.txt") + env_file = tmp_path / ".env" + + assert should_auto_approve_edit( + EditProposal("write_file", str(workspace_file), None, "x", {}), + "workspace_session", + str(tmp_path), + ) + assert should_auto_approve_edit( + EditProposal("write_file", str(tmp_file), None, "x", {}), + "workspace_session", + str(tmp_path), + ) + assert not should_auto_approve_edit( + EditProposal("write_file", str(env_file), None, "SECRET=x", {}), + "session", + str(tmp_path), + ) diff --git a/tests/acp/test_server.py b/tests/acp/test_server.py index 65dd6fd6b72..e17d9c618cb 100644 --- a/tests/acp/test_server.py +++ b/tests/acp/test_server.py @@ -52,6 +52,32 @@ def agent(mock_manager): """HermesACPAgent backed by a mock session manager.""" return HermesACPAgent(session_manager=mock_manager) + @pytest.mark.asyncio + async def test_new_session_includes_edit_approval_config_option(self, agent): + resp = await agent.new_session(cwd="/tmp") + + assert resp.config_options + option = resp.config_options[0] + assert option.id == "edit_approval_policy" + assert option.current_value == "ask" + assert {choice.value for choice in option.options} == { + "ask", + "workspace_session", + "session", + } + + @pytest.mark.asyncio + async def test_set_config_option_persists_edit_approval_policy(self, agent): + resp = await agent.new_session(cwd="/tmp") + update = await agent.set_config_option( + "edit_approval_policy", + resp.session_id, + "workspace_session", + ) + + assert isinstance(update, SetSessionConfigOptionResponse) + assert update.config_options[0].current_value == "workspace_session" + # --------------------------------------------------------------------------- # initialize @@ -892,7 +918,8 @@ class TestSessionConfiguration: ) assert mode_result == {} - assert config_result == {"configOptions": []} + assert config_result["configOptions"] + assert config_result["configOptions"][0]["id"] == "edit_approval_policy" @pytest.mark.asyncio async def test_router_accepts_unstable_model_switch_when_enabled(self, agent):