hermes-agent/tests/agent/test_copilot_acp_client.py

142 lines
4.7 KiB
Python

"""Focused regressions for the Copilot ACP shim safety layer."""
from __future__ import annotations
import io
import json
import os
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
from agent.copilot_acp_client import CopilotACPClient
class _FakeProcess:
def __init__(self) -> None:
self.stdin = io.StringIO()
class CopilotACPClientSafetyTests(unittest.TestCase):
def setUp(self) -> None:
self.client = CopilotACPClient(acp_cwd="/tmp")
def _dispatch(self, message: dict, *, cwd: str) -> dict:
process = _FakeProcess()
handled = self.client._handle_server_message(
message,
process=process,
cwd=cwd,
text_parts=[],
reasoning_parts=[],
)
self.assertTrue(handled)
payload = process.stdin.getvalue().strip()
self.assertTrue(payload)
return json.loads(payload)
def test_request_permission_is_not_auto_allowed(self) -> None:
response = self._dispatch(
{
"jsonrpc": "2.0",
"id": 1,
"method": "session/request_permission",
"params": {},
},
cwd="/tmp",
)
outcome = (((response.get("result") or {}).get("outcome") or {}).get("outcome"))
self.assertEqual(outcome, "cancelled")
def test_read_text_file_blocks_internal_hermes_hub_files(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
home = Path(tmpdir) / "home"
blocked = home / ".hermes" / "skills" / ".hub" / "index-cache" / "entry.json"
blocked.parent.mkdir(parents=True, exist_ok=True)
blocked.write_text('{"token":"sk-test-secret-1234567890"}')
with patch.dict(os.environ, {"HOME": str(home)}, clear=False):
response = self._dispatch(
{
"jsonrpc": "2.0",
"id": 2,
"method": "fs/read_text_file",
"params": {"path": str(blocked)},
},
cwd=str(home),
)
self.assertIn("error", response)
def test_read_text_file_redacts_sensitive_content(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
root = Path(tmpdir)
secret_file = root / "config.env"
secret_file.write_text("OPENAI_API_KEY=sk-proj-abc123def456ghi789jkl012")
response = self._dispatch(
{
"jsonrpc": "2.0",
"id": 3,
"method": "fs/read_text_file",
"params": {"path": str(secret_file)},
},
cwd=str(root),
)
content = ((response.get("result") or {}).get("content") or "")
self.assertNotIn("abc123def456", content)
self.assertIn("OPENAI_API_KEY=", content)
def test_write_text_file_reuses_write_denylist(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
home = Path(tmpdir) / "home"
target = home / ".ssh" / "id_rsa"
target.parent.mkdir(parents=True, exist_ok=True)
with patch("agent.copilot_acp_client.is_write_denied", return_value=True, create=True):
response = self._dispatch(
{
"jsonrpc": "2.0",
"id": 4,
"method": "fs/write_text_file",
"params": {
"path": str(target),
"content": "fake-private-key",
},
},
cwd=str(home),
)
self.assertIn("error", response)
self.assertFalse(target.exists())
def test_write_text_file_respects_safe_root(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
root = Path(tmpdir)
safe_root = root / "workspace"
safe_root.mkdir()
outside = root / "outside.txt"
with patch.dict(os.environ, {"HERMES_WRITE_SAFE_ROOT": str(safe_root)}, clear=False):
response = self._dispatch(
{
"jsonrpc": "2.0",
"id": 5,
"method": "fs/write_text_file",
"params": {
"path": str(outside),
"content": "should-not-write",
},
},
cwd=str(root),
)
self.assertIn("error", response)
self.assertFalse(outside.exists())
if __name__ == "__main__":
unittest.main()