fix(acp): inline file attachment resources

This commit is contained in:
HenkDz 2026-05-07 17:07:57 +01:00 committed by Teknium
parent 498bfc7bc1
commit 733e297b8a
2 changed files with 242 additions and 3 deletions

View file

@ -3,13 +3,16 @@
from __future__ import annotations
import asyncio
import base64
import contextvars
import json
import logging
import os
from collections import defaultdict, deque
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Any, Deque, Optional
from urllib.parse import unquote, urlparse
import acp
from acp.schema import (
@ -18,6 +21,7 @@ from acp.schema import (
AuthenticateResponse,
AvailableCommand,
AvailableCommandsUpdate,
BlobResourceContents,
ClientCapabilities,
EmbeddedResourceContentBlock,
ForkSessionResponse,
@ -46,6 +50,7 @@ from acp.schema import (
SessionResumeCapabilities,
SessionInfo,
TextContentBlock,
TextResourceContents,
UnstructuredCommandInput,
Usage,
UsageUpdate,
@ -83,6 +88,179 @@ _executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="acp-agent")
# does not expose a client-side limit, so this is a fixed cap that clients
# paginate against using `cursor` / `next_cursor`.
_LIST_SESSIONS_PAGE_SIZE = 50
_MAX_ACP_RESOURCE_BYTES = 512 * 1024
_TEXT_RESOURCE_MIME_PREFIXES = ("text/",)
_TEXT_RESOURCE_MIME_TYPES = {
"application/json",
"application/javascript",
"application/typescript",
"application/xml",
"application/x-yaml",
"application/yaml",
"application/toml",
"application/sql",
}
def _resource_display_name(uri: str, name: str | None = None, title: str | None = None) -> str:
"""Human-readable attachment name for prompt context."""
raw_name = (name or "").strip()
raw_title = (title or "").strip()
if raw_title and raw_name and raw_title != raw_name:
return f"{raw_title} ({raw_name})"
if raw_title:
return raw_title
if raw_name:
return raw_name
parsed = urlparse(uri)
candidate = parsed.path if parsed.scheme else uri
return Path(unquote(candidate)).name or uri or "resource"
def _is_text_resource(mime_type: str | None) -> bool:
mime = (mime_type or "").split(";", 1)[0].strip().lower()
if not mime:
return False
return mime.startswith(_TEXT_RESOURCE_MIME_PREFIXES) or mime in _TEXT_RESOURCE_MIME_TYPES
def _path_from_file_uri(uri: str) -> Path | None:
"""Convert local file URIs/paths from ACP clients into a readable Path.
Zed may send POSIX file URIs from Linux/WSL workspaces or Windows-ish paths
when launched through wsl.exe. Translate the common Windows drive form to
/mnt/<drive>/... so Hermes running in WSL can read it.
"""
raw = (uri or "").strip()
if not raw:
return None
parsed = urlparse(raw)
if parsed.scheme and parsed.scheme != "file":
return None
if parsed.scheme == "file":
if parsed.netloc and parsed.netloc not in {"", "localhost"}:
return None
path_text = unquote(parsed.path or "")
else:
path_text = unquote(raw)
# file:///C:/Users/... or C:\Users\...
if len(path_text) >= 3 and path_text[0] == "/" and path_text[2] == ":" and path_text[1].isalpha():
drive = path_text[1].lower()
rest = path_text[3:].lstrip("/\\").replace("\\", "/")
return Path("/mnt") / drive / rest
if len(path_text) >= 2 and path_text[1] == ":" and path_text[0].isalpha():
drive = path_text[0].lower()
rest = path_text[2:].lstrip("/\\").replace("\\", "/")
return Path("/mnt") / drive / rest
return Path(path_text)
def _decode_text_bytes(data: bytes, mime_type: str | None) -> str | None:
"""Decode resource bytes if they are probably text; return None for binary."""
if b"\x00" in data and not _is_text_resource(mime_type):
return None
for encoding in ("utf-8-sig", "utf-8", "latin-1"):
try:
return data.decode(encoding)
except UnicodeDecodeError:
continue
return data.decode("utf-8", errors="replace")
def _format_resource_text(
*,
uri: str,
body: str,
name: str | None = None,
title: str | None = None,
note: str | None = None,
) -> str:
display = _resource_display_name(uri, name=name, title=title)
header = f"[Attached file: {display}]"
if note:
header += f" ({note})"
return f"{header}\nURI: {uri}\n\n{body}"
def _resource_link_to_text(block: ResourceContentBlock) -> str | None:
uri = str(getattr(block, "uri", "") or "").strip()
if not uri:
return None
name = str(getattr(block, "name", "") or "").strip() or None
title = str(getattr(block, "title", "") or "").strip() or None
mime_type = str(getattr(block, "mime_type", "") or "").strip() or None
path = _path_from_file_uri(uri)
if path is None:
return _format_resource_text(
uri=uri,
name=name,
title=title,
body="[Resource link only; Hermes cannot read non-file ACP resource URIs directly.]",
)
try:
size = path.stat().st_size
read_size = min(size, _MAX_ACP_RESOURCE_BYTES)
with path.open("rb") as fh:
data = fh.read(read_size)
text = _decode_text_bytes(data, mime_type)
if text is None:
return _format_resource_text(
uri=uri,
name=name,
title=title,
body=f"[Binary file omitted: {size} bytes, mime={mime_type or 'unknown'}]",
)
note = None
if size > _MAX_ACP_RESOURCE_BYTES:
note = f"truncated to {_MAX_ACP_RESOURCE_BYTES} of {size} bytes"
return _format_resource_text(uri=uri, name=name, title=title, body=text, note=note)
except OSError as exc:
logger.warning("ACP resource read failed: %s", uri, exc_info=True)
return _format_resource_text(
uri=uri,
name=name,
title=title,
body=f"[Could not read attached file: {exc}]",
)
def _embedded_resource_to_text(block: EmbeddedResourceContentBlock) -> str | None:
resource = getattr(block, "resource", None)
if resource is None:
return None
uri = str(getattr(resource, "uri", "") or "").strip()
mime_type = str(getattr(resource, "mime_type", "") or "").strip() or None
if isinstance(resource, TextResourceContents):
return _format_resource_text(uri=uri, body=resource.text)
if isinstance(resource, BlobResourceContents):
blob = resource.blob or ""
try:
data = base64.b64decode(blob, validate=True)
except Exception:
data = blob.encode("utf-8", errors="replace")
text = _decode_text_bytes(data[:_MAX_ACP_RESOURCE_BYTES], mime_type)
if text is None:
body = f"[Binary embedded file omitted: {len(data)} bytes, mime={mime_type or 'unknown'}]"
else:
body = text
if len(data) > _MAX_ACP_RESOURCE_BYTES:
body += f"\n\n[Truncated to {_MAX_ACP_RESOURCE_BYTES} of {len(data)} bytes]"
return _format_resource_text(uri=uri, body=body)
text = getattr(resource, "text", None)
if text:
return _format_resource_text(uri=uri, body=str(text))
return None
def _extract_text(
@ -144,6 +322,18 @@ def _content_blocks_to_openai_user_content(
if image_part is not None:
parts.append(image_part)
continue
if isinstance(block, ResourceContentBlock):
resource_text = _resource_link_to_text(block)
if resource_text:
parts.append({"type": "text", "text": resource_text})
text_parts.append(resource_text)
continue
if isinstance(block, EmbeddedResourceContentBlock):
resource_text = _embedded_resource_to_text(block)
if resource_text:
parts.append({"type": "text", "text": resource_text})
text_parts.append(resource_text)
continue
if not parts:
return _extract_text(prompt)
@ -803,6 +993,7 @@ class HermesACPAgent(acp.Agent):
user_text = _extract_text(prompt).strip()
user_content = _content_blocks_to_openai_user_content(prompt)
text_only_prompt = all(isinstance(block, TextContentBlock) for block in prompt)
has_content = bool(user_text) or (
isinstance(user_content, list) and bool(user_content)
)
@ -821,7 +1012,7 @@ class HermesACPAgent(acp.Agent):
# silently append to state.queued_prompts and respond with
# "No active turn — queued for the next turn", which looks like
# /queue even though the user never typed /queue.
if isinstance(user_content, str) and user_text.startswith("/steer"):
if text_only_prompt and isinstance(user_content, str) and user_text.startswith("/steer"):
steer_text = user_text.split(maxsplit=1)[1].strip() if len(user_text.split(maxsplit=1)) > 1 else ""
interrupted_prompt = ""
rewrite_idle = False
@ -846,7 +1037,7 @@ class HermesACPAgent(acp.Agent):
# Slash commands are text-only; if the client included images/resources,
# send the whole multimodal prompt to the agent instead of treating it as
# an ACP command.
if isinstance(user_content, str) and user_text.startswith("/"):
if text_only_prompt and isinstance(user_content, str) and user_text.startswith("/"):
response_text = self._handle_slash_command(user_text, state)
if response_text is not None:
if self._conn:

View file

@ -1,5 +1,11 @@
import pytest
from acp.schema import ImageContentBlock, TextContentBlock
from acp.schema import (
EmbeddedResourceContentBlock,
ImageContentBlock,
ResourceContentBlock,
TextContentBlock,
TextResourceContents,
)
from acp_adapter.server import HermesACPAgent, _content_blocks_to_openai_user_content
@ -27,6 +33,48 @@ def test_text_only_acp_blocks_stay_string_for_legacy_prompt_path():
assert content == "/help"
def test_acp_resource_link_file_is_inlined_as_text(tmp_path):
attached = tmp_path / "notes.md"
attached.write_text("# Notes\n\nAttached file body", encoding="utf-8")
content = _content_blocks_to_openai_user_content([
TextContentBlock(type="text", text="Please read this file"),
ResourceContentBlock(
type="resource_link",
name="notes.md",
title="Project notes",
uri=attached.as_uri(),
mimeType="text/markdown",
),
])
assert content == (
"Please read this file\n"
"[Attached file: Project notes (notes.md)]\n"
f"URI: {attached.as_uri()}\n\n"
"# Notes\n\nAttached file body"
)
def test_acp_embedded_text_resource_is_inlined_as_text():
content = _content_blocks_to_openai_user_content([
EmbeddedResourceContentBlock(
type="resource",
resource=TextResourceContents(
uri="file:///workspace/todo.txt",
mimeType="text/plain",
text="first\nsecond",
),
),
])
assert content == (
"[Attached file: todo.txt]\n"
"URI: file:///workspace/todo.txt\n\n"
"first\nsecond"
)
@pytest.mark.asyncio
async def test_initialize_advertises_image_prompt_capability():
response = await HermesACPAgent().initialize()