From 733e297b8a5c7ab277db331672c206134587ffa7 Mon Sep 17 00:00:00 2001 From: HenkDz Date: Thu, 7 May 2026 17:07:57 +0100 Subject: [PATCH] fix(acp): inline file attachment resources --- acp_adapter/server.py | 195 ++++++++++++++++++++++++++- tests/acp_adapter/test_acp_images.py | 50 ++++++- 2 files changed, 242 insertions(+), 3 deletions(-) diff --git a/acp_adapter/server.py b/acp_adapter/server.py index dd9d75af9c..4948ebdc73 100644 --- a/acp_adapter/server.py +++ b/acp_adapter/server.py @@ -3,13 +3,16 @@ from __future__ import annotations import asyncio +import base64 import contextvars import json import logging import os from collections import defaultdict, deque from concurrent.futures import ThreadPoolExecutor +from pathlib import Path from typing import Any, Deque, Optional +from urllib.parse import unquote, urlparse import acp from acp.schema import ( @@ -18,6 +21,7 @@ from acp.schema import ( AuthenticateResponse, AvailableCommand, AvailableCommandsUpdate, + BlobResourceContents, ClientCapabilities, EmbeddedResourceContentBlock, ForkSessionResponse, @@ -46,6 +50,7 @@ from acp.schema import ( SessionResumeCapabilities, SessionInfo, TextContentBlock, + TextResourceContents, UnstructuredCommandInput, Usage, UsageUpdate, @@ -83,6 +88,179 @@ _executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="acp-agent") # does not expose a client-side limit, so this is a fixed cap that clients # paginate against using `cursor` / `next_cursor`. _LIST_SESSIONS_PAGE_SIZE = 50 +_MAX_ACP_RESOURCE_BYTES = 512 * 1024 +_TEXT_RESOURCE_MIME_PREFIXES = ("text/",) +_TEXT_RESOURCE_MIME_TYPES = { + "application/json", + "application/javascript", + "application/typescript", + "application/xml", + "application/x-yaml", + "application/yaml", + "application/toml", + "application/sql", +} + + +def _resource_display_name(uri: str, name: str | None = None, title: str | None = None) -> str: + """Human-readable attachment name for prompt context.""" + raw_name = (name or "").strip() + raw_title = (title or "").strip() + if raw_title and raw_name and raw_title != raw_name: + return f"{raw_title} ({raw_name})" + if raw_title: + return raw_title + if raw_name: + return raw_name + parsed = urlparse(uri) + candidate = parsed.path if parsed.scheme else uri + return Path(unquote(candidate)).name or uri or "resource" + + +def _is_text_resource(mime_type: str | None) -> bool: + mime = (mime_type or "").split(";", 1)[0].strip().lower() + if not mime: + return False + return mime.startswith(_TEXT_RESOURCE_MIME_PREFIXES) or mime in _TEXT_RESOURCE_MIME_TYPES + + +def _path_from_file_uri(uri: str) -> Path | None: + """Convert local file URIs/paths from ACP clients into a readable Path. + + Zed may send POSIX file URIs from Linux/WSL workspaces or Windows-ish paths + when launched through wsl.exe. Translate the common Windows drive form to + /mnt//... so Hermes running in WSL can read it. + """ + raw = (uri or "").strip() + if not raw: + return None + + parsed = urlparse(raw) + if parsed.scheme and parsed.scheme != "file": + return None + + if parsed.scheme == "file": + if parsed.netloc and parsed.netloc not in {"", "localhost"}: + return None + path_text = unquote(parsed.path or "") + else: + path_text = unquote(raw) + + # file:///C:/Users/... or C:\Users\... + if len(path_text) >= 3 and path_text[0] == "/" and path_text[2] == ":" and path_text[1].isalpha(): + drive = path_text[1].lower() + rest = path_text[3:].lstrip("/\\").replace("\\", "/") + return Path("/mnt") / drive / rest + if len(path_text) >= 2 and path_text[1] == ":" and path_text[0].isalpha(): + drive = path_text[0].lower() + rest = path_text[2:].lstrip("/\\").replace("\\", "/") + return Path("/mnt") / drive / rest + + return Path(path_text) + + +def _decode_text_bytes(data: bytes, mime_type: str | None) -> str | None: + """Decode resource bytes if they are probably text; return None for binary.""" + if b"\x00" in data and not _is_text_resource(mime_type): + return None + for encoding in ("utf-8-sig", "utf-8", "latin-1"): + try: + return data.decode(encoding) + except UnicodeDecodeError: + continue + return data.decode("utf-8", errors="replace") + + +def _format_resource_text( + *, + uri: str, + body: str, + name: str | None = None, + title: str | None = None, + note: str | None = None, +) -> str: + display = _resource_display_name(uri, name=name, title=title) + header = f"[Attached file: {display}]" + if note: + header += f" ({note})" + return f"{header}\nURI: {uri}\n\n{body}" + + +def _resource_link_to_text(block: ResourceContentBlock) -> str | None: + uri = str(getattr(block, "uri", "") or "").strip() + if not uri: + return None + + name = str(getattr(block, "name", "") or "").strip() or None + title = str(getattr(block, "title", "") or "").strip() or None + mime_type = str(getattr(block, "mime_type", "") or "").strip() or None + path = _path_from_file_uri(uri) + + if path is None: + return _format_resource_text( + uri=uri, + name=name, + title=title, + body="[Resource link only; Hermes cannot read non-file ACP resource URIs directly.]", + ) + + try: + size = path.stat().st_size + read_size = min(size, _MAX_ACP_RESOURCE_BYTES) + with path.open("rb") as fh: + data = fh.read(read_size) + text = _decode_text_bytes(data, mime_type) + if text is None: + return _format_resource_text( + uri=uri, + name=name, + title=title, + body=f"[Binary file omitted: {size} bytes, mime={mime_type or 'unknown'}]", + ) + note = None + if size > _MAX_ACP_RESOURCE_BYTES: + note = f"truncated to {_MAX_ACP_RESOURCE_BYTES} of {size} bytes" + return _format_resource_text(uri=uri, name=name, title=title, body=text, note=note) + except OSError as exc: + logger.warning("ACP resource read failed: %s", uri, exc_info=True) + return _format_resource_text( + uri=uri, + name=name, + title=title, + body=f"[Could not read attached file: {exc}]", + ) + + +def _embedded_resource_to_text(block: EmbeddedResourceContentBlock) -> str | None: + resource = getattr(block, "resource", None) + if resource is None: + return None + + uri = str(getattr(resource, "uri", "") or "").strip() + mime_type = str(getattr(resource, "mime_type", "") or "").strip() or None + + if isinstance(resource, TextResourceContents): + return _format_resource_text(uri=uri, body=resource.text) + + if isinstance(resource, BlobResourceContents): + blob = resource.blob or "" + try: + data = base64.b64decode(blob, validate=True) + except Exception: + data = blob.encode("utf-8", errors="replace") + text = _decode_text_bytes(data[:_MAX_ACP_RESOURCE_BYTES], mime_type) + if text is None: + body = f"[Binary embedded file omitted: {len(data)} bytes, mime={mime_type or 'unknown'}]" + else: + body = text + if len(data) > _MAX_ACP_RESOURCE_BYTES: + body += f"\n\n[Truncated to {_MAX_ACP_RESOURCE_BYTES} of {len(data)} bytes]" + return _format_resource_text(uri=uri, body=body) + + text = getattr(resource, "text", None) + if text: + return _format_resource_text(uri=uri, body=str(text)) + return None def _extract_text( @@ -144,6 +322,18 @@ def _content_blocks_to_openai_user_content( if image_part is not None: parts.append(image_part) continue + if isinstance(block, ResourceContentBlock): + resource_text = _resource_link_to_text(block) + if resource_text: + parts.append({"type": "text", "text": resource_text}) + text_parts.append(resource_text) + continue + if isinstance(block, EmbeddedResourceContentBlock): + resource_text = _embedded_resource_to_text(block) + if resource_text: + parts.append({"type": "text", "text": resource_text}) + text_parts.append(resource_text) + continue if not parts: return _extract_text(prompt) @@ -803,6 +993,7 @@ class HermesACPAgent(acp.Agent): user_text = _extract_text(prompt).strip() user_content = _content_blocks_to_openai_user_content(prompt) + text_only_prompt = all(isinstance(block, TextContentBlock) for block in prompt) has_content = bool(user_text) or ( isinstance(user_content, list) and bool(user_content) ) @@ -821,7 +1012,7 @@ class HermesACPAgent(acp.Agent): # silently append to state.queued_prompts and respond with # "No active turn — queued for the next turn", which looks like # /queue even though the user never typed /queue. - if isinstance(user_content, str) and user_text.startswith("/steer"): + if text_only_prompt and isinstance(user_content, str) and user_text.startswith("/steer"): steer_text = user_text.split(maxsplit=1)[1].strip() if len(user_text.split(maxsplit=1)) > 1 else "" interrupted_prompt = "" rewrite_idle = False @@ -846,7 +1037,7 @@ class HermesACPAgent(acp.Agent): # Slash commands are text-only; if the client included images/resources, # send the whole multimodal prompt to the agent instead of treating it as # an ACP command. - if isinstance(user_content, str) and user_text.startswith("/"): + if text_only_prompt and isinstance(user_content, str) and user_text.startswith("/"): response_text = self._handle_slash_command(user_text, state) if response_text is not None: if self._conn: diff --git a/tests/acp_adapter/test_acp_images.py b/tests/acp_adapter/test_acp_images.py index 03d37840f3..6574472e10 100644 --- a/tests/acp_adapter/test_acp_images.py +++ b/tests/acp_adapter/test_acp_images.py @@ -1,5 +1,11 @@ import pytest -from acp.schema import ImageContentBlock, TextContentBlock +from acp.schema import ( + EmbeddedResourceContentBlock, + ImageContentBlock, + ResourceContentBlock, + TextContentBlock, + TextResourceContents, +) from acp_adapter.server import HermesACPAgent, _content_blocks_to_openai_user_content @@ -27,6 +33,48 @@ def test_text_only_acp_blocks_stay_string_for_legacy_prompt_path(): assert content == "/help" +def test_acp_resource_link_file_is_inlined_as_text(tmp_path): + attached = tmp_path / "notes.md" + attached.write_text("# Notes\n\nAttached file body", encoding="utf-8") + + content = _content_blocks_to_openai_user_content([ + TextContentBlock(type="text", text="Please read this file"), + ResourceContentBlock( + type="resource_link", + name="notes.md", + title="Project notes", + uri=attached.as_uri(), + mimeType="text/markdown", + ), + ]) + + assert content == ( + "Please read this file\n" + "[Attached file: Project notes (notes.md)]\n" + f"URI: {attached.as_uri()}\n\n" + "# Notes\n\nAttached file body" + ) + + +def test_acp_embedded_text_resource_is_inlined_as_text(): + content = _content_blocks_to_openai_user_content([ + EmbeddedResourceContentBlock( + type="resource", + resource=TextResourceContents( + uri="file:///workspace/todo.txt", + mimeType="text/plain", + text="first\nsecond", + ), + ), + ]) + + assert content == ( + "[Attached file: todo.txt]\n" + "URI: file:///workspace/todo.txt\n\n" + "first\nsecond" + ) + + @pytest.mark.asyncio async def test_initialize_advertises_image_prompt_capability(): response = await HermesACPAgent().initialize()