"""Focused regressions for the Copilot ACP shim safety layer.""" from __future__ import annotations import io import json import os import tempfile import unittest from pathlib import Path from unittest.mock import patch from agent.copilot_acp_client import CopilotACPClient class _FakeProcess: def __init__(self) -> None: self.stdin = io.StringIO() class CopilotACPClientSafetyTests(unittest.TestCase): def setUp(self) -> None: self.client = CopilotACPClient(acp_cwd="/tmp") def _dispatch(self, message: dict, *, cwd: str) -> dict: process = _FakeProcess() handled = self.client._handle_server_message( message, process=process, cwd=cwd, text_parts=[], reasoning_parts=[], ) self.assertTrue(handled) payload = process.stdin.getvalue().strip() self.assertTrue(payload) return json.loads(payload) def test_request_permission_is_not_auto_allowed(self) -> None: response = self._dispatch( { "jsonrpc": "2.0", "id": 1, "method": "session/request_permission", "params": {}, }, cwd="/tmp", ) outcome = (((response.get("result") or {}).get("outcome") or {}).get("outcome")) self.assertEqual(outcome, "cancelled") def test_read_text_file_blocks_internal_hermes_hub_files(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: home = Path(tmpdir) / "home" blocked = home / ".hermes" / "skills" / ".hub" / "index-cache" / "entry.json" blocked.parent.mkdir(parents=True, exist_ok=True) blocked.write_text('{"token":"sk-test-secret-1234567890"}') with patch.dict( os.environ, {"HOME": str(home), "HERMES_HOME": str(home / ".hermes")}, clear=False, ): response = self._dispatch( { "jsonrpc": "2.0", "id": 2, "method": "fs/read_text_file", "params": {"path": str(blocked)}, }, cwd=str(home), ) self.assertIn("error", response) def test_read_text_file_redacts_sensitive_content(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: root = Path(tmpdir) secret_file = root / "config.env" secret_file.write_text("OPENAI_API_KEY=sk-proj-abc123def456ghi789jkl012") response = self._dispatch( { "jsonrpc": "2.0", "id": 3, "method": "fs/read_text_file", "params": {"path": str(secret_file)}, }, cwd=str(root), ) content = ((response.get("result") or {}).get("content") or "") self.assertNotIn("abc123def456", content) self.assertIn("OPENAI_API_KEY=", content) def test_write_text_file_reuses_write_denylist(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: home = Path(tmpdir) / "home" target = home / ".ssh" / "id_rsa" target.parent.mkdir(parents=True, exist_ok=True) with patch("agent.copilot_acp_client.is_write_denied", return_value=True, create=True): response = self._dispatch( { "jsonrpc": "2.0", "id": 4, "method": "fs/write_text_file", "params": { "path": str(target), "content": "fake-private-key", }, }, cwd=str(home), ) self.assertIn("error", response) self.assertFalse(target.exists()) def test_write_text_file_respects_safe_root(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: root = Path(tmpdir) safe_root = root / "workspace" safe_root.mkdir() outside = root / "outside.txt" with patch.dict(os.environ, {"HERMES_WRITE_SAFE_ROOT": str(safe_root)}, clear=False): response = self._dispatch( { "jsonrpc": "2.0", "id": 5, "method": "fs/write_text_file", "params": { "path": str(outside), "content": "should-not-write", }, }, cwd=str(root), ) self.assertIn("error", response) self.assertFalse(outside.exists()) if __name__ == "__main__": unittest.main()