diff --git a/agent/copilot_acp_client.py b/agent/copilot_acp_client.py index 7a0d3dfd6..783f94956 100644 --- a/agent/copilot_acp_client.py +++ b/agent/copilot_acp_client.py @@ -21,6 +21,9 @@ from pathlib import Path from types import SimpleNamespace from typing import Any +from agent.file_safety import get_read_block_error, is_write_denied +from agent.redact import redact_sensitive_text + ACP_MARKER_BASE_URL = "acp://copilot" _DEFAULT_TIMEOUT_SECONDS = 900.0 @@ -54,6 +57,18 @@ def _jsonrpc_error(message_id: Any, code: int, message: str) -> dict[str, Any]: } +def _permission_denied(message_id: Any) -> dict[str, Any]: + return { + "jsonrpc": "2.0", + "id": message_id, + "result": { + "outcome": { + "outcome": "cancelled", + } + }, + } + + def _format_messages_as_prompt( messages: list[dict[str, Any]], model: str | None = None, @@ -535,18 +550,13 @@ class CopilotACPClient: params = msg.get("params") or {} if method == "session/request_permission": - response = { - "jsonrpc": "2.0", - "id": message_id, - "result": { - "outcome": { - "outcome": "allow_once", - } - }, - } + response = _permission_denied(message_id) elif method == "fs/read_text_file": try: path = _ensure_path_within_cwd(str(params.get("path") or ""), cwd) + block_error = get_read_block_error(str(path)) + if block_error: + raise PermissionError(block_error) content = path.read_text() if path.exists() else "" line = params.get("line") limit = params.get("limit") @@ -555,6 +565,8 @@ class CopilotACPClient: start = line - 1 end = start + limit if isinstance(limit, int) and limit > 0 else None content = "".join(lines[start:end]) + if content: + content = redact_sensitive_text(content) response = { "jsonrpc": "2.0", "id": message_id, @@ -567,6 +579,10 @@ class CopilotACPClient: elif method == "fs/write_text_file": try: path = _ensure_path_within_cwd(str(params.get("path") or ""), cwd) + if is_write_denied(str(path)): + raise PermissionError( + f"Write denied: '{path}' is a protected system/credential file." + ) path.parent.mkdir(parents=True, exist_ok=True) path.write_text(str(params.get("content") or "")) response = { diff --git a/agent/file_safety.py b/agent/file_safety.py new file mode 100644 index 000000000..09da46caf --- /dev/null +++ b/agent/file_safety.py @@ -0,0 +1,111 @@ +"""Shared file safety rules used by both tools and ACP shims.""" + +from __future__ import annotations + +import os +from pathlib import Path +from typing import Optional + + +def _hermes_home_path() -> Path: + """Resolve the active HERMES_HOME (profile-aware) without circular imports.""" + try: + from hermes_constants import get_hermes_home # local import to avoid cycles + return get_hermes_home() + except Exception: + return Path(os.path.expanduser("~/.hermes")) + + +def build_write_denied_paths(home: str) -> set[str]: + """Return exact sensitive paths that must never be written.""" + hermes_home = _hermes_home_path() + return { + os.path.realpath(p) + for p in [ + os.path.join(home, ".ssh", "authorized_keys"), + os.path.join(home, ".ssh", "id_rsa"), + os.path.join(home, ".ssh", "id_ed25519"), + os.path.join(home, ".ssh", "config"), + str(hermes_home / ".env"), + os.path.join(home, ".bashrc"), + os.path.join(home, ".zshrc"), + os.path.join(home, ".profile"), + os.path.join(home, ".bash_profile"), + os.path.join(home, ".zprofile"), + os.path.join(home, ".netrc"), + os.path.join(home, ".pgpass"), + os.path.join(home, ".npmrc"), + os.path.join(home, ".pypirc"), + "/etc/sudoers", + "/etc/passwd", + "/etc/shadow", + ] + } + + +def build_write_denied_prefixes(home: str) -> list[str]: + """Return sensitive directory prefixes that must never be written.""" + return [ + os.path.realpath(p) + os.sep + for p in [ + os.path.join(home, ".ssh"), + os.path.join(home, ".aws"), + os.path.join(home, ".gnupg"), + os.path.join(home, ".kube"), + "/etc/sudoers.d", + "/etc/systemd", + os.path.join(home, ".docker"), + os.path.join(home, ".azure"), + os.path.join(home, ".config", "gh"), + ] + ] + + +def get_safe_write_root() -> Optional[str]: + """Return the resolved HERMES_WRITE_SAFE_ROOT path, or None if unset.""" + root = os.getenv("HERMES_WRITE_SAFE_ROOT", "") + if not root: + return None + try: + return os.path.realpath(os.path.expanduser(root)) + except Exception: + return None + + +def is_write_denied(path: str) -> bool: + """Return True if path is blocked by the write denylist or safe root.""" + home = os.path.realpath(os.path.expanduser("~")) + resolved = os.path.realpath(os.path.expanduser(str(path))) + + if resolved in build_write_denied_paths(home): + return True + for prefix in build_write_denied_prefixes(home): + if resolved.startswith(prefix): + return True + + safe_root = get_safe_write_root() + if safe_root and not (resolved == safe_root or resolved.startswith(safe_root + os.sep)): + return True + + return False + + +def get_read_block_error(path: str) -> Optional[str]: + """Return an error message when a read targets internal Hermes cache files.""" + resolved = Path(path).expanduser().resolve() + hermes_home = _hermes_home_path().resolve() + blocked_dirs = [ + hermes_home / "skills" / ".hub" / "index-cache", + hermes_home / "skills" / ".hub", + ] + for blocked in blocked_dirs: + try: + resolved.relative_to(blocked) + except ValueError: + continue + return ( + f"Access denied: {path} is an internal Hermes cache file " + "and cannot be read directly to prevent prompt injection. " + "Use the skills_list or skill_view tools instead." + ) + return None diff --git a/tests/agent/test_copilot_acp_client.py b/tests/agent/test_copilot_acp_client.py new file mode 100644 index 000000000..ce481a57b --- /dev/null +++ b/tests/agent/test_copilot_acp_client.py @@ -0,0 +1,142 @@ +"""Focused regressions for the Copilot ACP shim safety layer.""" + +from __future__ import annotations + +import io +import json +import os +import tempfile +import unittest +from pathlib import Path +from unittest.mock import patch + +from agent.copilot_acp_client import CopilotACPClient + + +class _FakeProcess: + def __init__(self) -> None: + self.stdin = io.StringIO() + + +class CopilotACPClientSafetyTests(unittest.TestCase): + def setUp(self) -> None: + self.client = CopilotACPClient(acp_cwd="/tmp") + + def _dispatch(self, message: dict, *, cwd: str) -> dict: + process = _FakeProcess() + handled = self.client._handle_server_message( + message, + process=process, + cwd=cwd, + text_parts=[], + reasoning_parts=[], + ) + self.assertTrue(handled) + payload = process.stdin.getvalue().strip() + self.assertTrue(payload) + return json.loads(payload) + + def test_request_permission_is_not_auto_allowed(self) -> None: + response = self._dispatch( + { + "jsonrpc": "2.0", + "id": 1, + "method": "session/request_permission", + "params": {}, + }, + cwd="/tmp", + ) + + outcome = (((response.get("result") or {}).get("outcome") or {}).get("outcome")) + self.assertEqual(outcome, "cancelled") + + def test_read_text_file_blocks_internal_hermes_hub_files(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + home = Path(tmpdir) / "home" + blocked = home / ".hermes" / "skills" / ".hub" / "index-cache" / "entry.json" + blocked.parent.mkdir(parents=True, exist_ok=True) + blocked.write_text('{"token":"sk-test-secret-1234567890"}') + + with patch.dict(os.environ, {"HOME": str(home)}, clear=False): + response = self._dispatch( + { + "jsonrpc": "2.0", + "id": 2, + "method": "fs/read_text_file", + "params": {"path": str(blocked)}, + }, + cwd=str(home), + ) + + self.assertIn("error", response) + + def test_read_text_file_redacts_sensitive_content(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + secret_file = root / "config.env" + secret_file.write_text("OPENAI_API_KEY=sk-proj-abc123def456ghi789jkl012") + + response = self._dispatch( + { + "jsonrpc": "2.0", + "id": 3, + "method": "fs/read_text_file", + "params": {"path": str(secret_file)}, + }, + cwd=str(root), + ) + + content = ((response.get("result") or {}).get("content") or "") + self.assertNotIn("abc123def456", content) + self.assertIn("OPENAI_API_KEY=", content) + + def test_write_text_file_reuses_write_denylist(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + home = Path(tmpdir) / "home" + target = home / ".ssh" / "id_rsa" + target.parent.mkdir(parents=True, exist_ok=True) + + with patch("agent.copilot_acp_client.is_write_denied", return_value=True, create=True): + response = self._dispatch( + { + "jsonrpc": "2.0", + "id": 4, + "method": "fs/write_text_file", + "params": { + "path": str(target), + "content": "fake-private-key", + }, + }, + cwd=str(home), + ) + + self.assertIn("error", response) + self.assertFalse(target.exists()) + + def test_write_text_file_respects_safe_root(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + safe_root = root / "workspace" + safe_root.mkdir() + outside = root / "outside.txt" + + with patch.dict(os.environ, {"HERMES_WRITE_SAFE_ROOT": str(safe_root)}, clear=False): + response = self._dispatch( + { + "jsonrpc": "2.0", + "id": 5, + "method": "fs/write_text_file", + "params": { + "path": str(outside), + "content": "should-not-write", + }, + }, + cwd=str(root), + ) + + self.assertIn("error", response) + self.assertFalse(outside.exists()) + + +if __name__ == "__main__": + unittest.main() diff --git a/tools/file_operations.py b/tools/file_operations.py index 8c3897bb2..59070d7ce 100644 --- a/tools/file_operations.py +++ b/tools/file_operations.py @@ -35,6 +35,13 @@ from pathlib import Path from hermes_constants import get_hermes_home from tools.binary_extensions import BINARY_EXTENSIONS +from agent.file_safety import ( + build_write_denied_paths, + build_write_denied_prefixes, + get_safe_write_root as _shared_get_safe_write_root, + is_write_denied as _shared_is_write_denied, +) + # --------------------------------------------------------------------------- # Write-path deny list — blocks writes to sensitive system/credential files @@ -42,41 +49,9 @@ from tools.binary_extensions import BINARY_EXTENSIONS _HOME = str(Path.home()) -WRITE_DENIED_PATHS = { - os.path.realpath(p) for p in [ - os.path.join(_HOME, ".ssh", "authorized_keys"), - os.path.join(_HOME, ".ssh", "id_rsa"), - os.path.join(_HOME, ".ssh", "id_ed25519"), - os.path.join(_HOME, ".ssh", "config"), - str(get_hermes_home() / ".env"), - os.path.join(_HOME, ".bashrc"), - os.path.join(_HOME, ".zshrc"), - os.path.join(_HOME, ".profile"), - os.path.join(_HOME, ".bash_profile"), - os.path.join(_HOME, ".zprofile"), - os.path.join(_HOME, ".netrc"), - os.path.join(_HOME, ".pgpass"), - os.path.join(_HOME, ".npmrc"), - os.path.join(_HOME, ".pypirc"), - "/etc/sudoers", - "/etc/passwd", - "/etc/shadow", - ] -} +WRITE_DENIED_PATHS = build_write_denied_paths(_HOME) -WRITE_DENIED_PREFIXES = [ - os.path.realpath(p) + os.sep for p in [ - os.path.join(_HOME, ".ssh"), - os.path.join(_HOME, ".aws"), - os.path.join(_HOME, ".gnupg"), - os.path.join(_HOME, ".kube"), - "/etc/sudoers.d", - "/etc/systemd", - os.path.join(_HOME, ".docker"), - os.path.join(_HOME, ".azure"), - os.path.join(_HOME, ".config", "gh"), - ] -] +WRITE_DENIED_PREFIXES = build_write_denied_prefixes(_HOME) def _get_safe_write_root() -> Optional[str]: @@ -87,33 +62,12 @@ def _get_safe_write_root() -> Optional[str]: not on the static deny list. Opt-in hardening for gateway/messaging deployments that should only touch a workspace checkout. """ - root = os.getenv("HERMES_WRITE_SAFE_ROOT", "") - if not root: - return None - try: - return os.path.realpath(os.path.expanduser(root)) - except Exception: - return None + return _shared_get_safe_write_root() def _is_write_denied(path: str) -> bool: """Return True if path is on the write deny list.""" - resolved = os.path.realpath(os.path.expanduser(str(path))) - - # 1) Static deny list - if resolved in WRITE_DENIED_PATHS: - return True - for prefix in WRITE_DENIED_PREFIXES: - if resolved.startswith(prefix): - return True - - # 2) Optional safe-root sandbox - safe_root = _get_safe_write_root() - if safe_root: - if not (resolved == safe_root or resolved.startswith(safe_root + os.sep)): - return True - - return False + return _shared_is_write_denied(path) # ============================================================================= diff --git a/tools/file_tools.py b/tools/file_tools.py index 3b2044c9d..af6701f82 100644 --- a/tools/file_tools.py +++ b/tools/file_tools.py @@ -7,6 +7,9 @@ import logging import os import threading from pathlib import Path +from typing import Optional + +from agent.file_safety import get_read_block_error from tools.binary_extensions import has_binary_extension from tools.file_operations import ShellFileOperations from agent.redact import redact_sensitive_text @@ -373,24 +376,9 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str = # ── Hermes internal path guard ──────────────────────────────── # Prevent prompt injection via catalog or hub metadata files. - from hermes_constants import get_hermes_home as _get_hh - _hermes_home = _get_hh().resolve() - _blocked_dirs = [ - _hermes_home / "skills" / ".hub" / "index-cache", - _hermes_home / "skills" / ".hub", - ] - for _blocked in _blocked_dirs: - try: - _resolved.relative_to(_blocked) - return json.dumps({ - "error": ( - f"Access denied: {path} is an internal Hermes cache file " - "and cannot be read directly to prevent prompt injection. " - "Use the skills_list or skill_view tools instead." - ) - }) - except ValueError: - pass + block_error = get_read_block_error(path) + if block_error: + return json.dumps({"error": block_error}) # ── Dedup check ─────────────────────────────────────────────── # If we already read this exact (path, offset, limit) and the