mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-08 03:01:47 +00:00
feat(acp): pass image file attachments through as image_url parts
Extends PR #21400's resource inlining with image-specific handling: ACP resource_link and embedded blob resources with an image/* mime (or image file suffix when mime is missing) now emit an OpenAI image_url part with a base64 data URL, so vision models actually see the image instead of a [Binary file omitted] note. Non-image resources keep the existing text-inlining behavior. Adds 3 tests: local PNG via resource_link, JPEG mime inferred from suffix when client omits mimeType, and embedded blob PNG.
This commit is contained in:
parent
733e297b8a
commit
7e2af0c2e8
2 changed files with 205 additions and 35 deletions
|
|
@ -124,6 +124,28 @@ def _is_text_resource(mime_type: str | None) -> bool:
|
||||||
return mime.startswith(_TEXT_RESOURCE_MIME_PREFIXES) or mime in _TEXT_RESOURCE_MIME_TYPES
|
return mime.startswith(_TEXT_RESOURCE_MIME_PREFIXES) or mime in _TEXT_RESOURCE_MIME_TYPES
|
||||||
|
|
||||||
|
|
||||||
|
def _is_image_resource(mime_type: str | None) -> bool:
|
||||||
|
mime = (mime_type or "").split(";", 1)[0].strip().lower()
|
||||||
|
return mime.startswith("image/")
|
||||||
|
|
||||||
|
|
||||||
|
def _guess_image_mime_from_path(path: Path) -> str | None:
|
||||||
|
suffix = path.suffix.lower()
|
||||||
|
return {
|
||||||
|
".png": "image/png",
|
||||||
|
".jpg": "image/jpeg",
|
||||||
|
".jpeg": "image/jpeg",
|
||||||
|
".gif": "image/gif",
|
||||||
|
".webp": "image/webp",
|
||||||
|
".bmp": "image/bmp",
|
||||||
|
".svg": "image/svg+xml",
|
||||||
|
}.get(suffix)
|
||||||
|
|
||||||
|
|
||||||
|
def _image_data_url(data: bytes, mime_type: str) -> str:
|
||||||
|
return f"data:{mime_type};base64,{base64.b64encode(data).decode('ascii')}"
|
||||||
|
|
||||||
|
|
||||||
def _path_from_file_uri(uri: str) -> Path | None:
|
def _path_from_file_uri(uri: str) -> Path | None:
|
||||||
"""Convert local file URIs/paths from ACP clients into a readable Path.
|
"""Convert local file URIs/paths from ACP clients into a readable Path.
|
||||||
|
|
||||||
|
|
@ -186,10 +208,17 @@ def _format_resource_text(
|
||||||
return f"{header}\nURI: {uri}\n\n{body}"
|
return f"{header}\nURI: {uri}\n\n{body}"
|
||||||
|
|
||||||
|
|
||||||
def _resource_link_to_text(block: ResourceContentBlock) -> str | None:
|
def _resource_link_to_parts(block: ResourceContentBlock) -> list[dict[str, Any]]:
|
||||||
|
"""Convert an ACP resource_link block to OpenAI content parts.
|
||||||
|
|
||||||
|
Returns a list of {"type": "text", ...} and/or {"type": "image_url", ...}
|
||||||
|
parts. Image resources produce an image_url part with a small text header
|
||||||
|
so the model knows which attachment it is. Non-image resources return a
|
||||||
|
single text part with the inlined file body (or a binary-omit note).
|
||||||
|
"""
|
||||||
uri = str(getattr(block, "uri", "") or "").strip()
|
uri = str(getattr(block, "uri", "") or "").strip()
|
||||||
if not uri:
|
if not uri:
|
||||||
return None
|
return []
|
||||||
|
|
||||||
name = str(getattr(block, "name", "") or "").strip() or None
|
name = str(getattr(block, "name", "") or "").strip() or None
|
||||||
title = str(getattr(block, "title", "") or "").strip() or None
|
title = str(getattr(block, "title", "") or "").strip() or None
|
||||||
|
|
@ -197,12 +226,50 @@ def _resource_link_to_text(block: ResourceContentBlock) -> str | None:
|
||||||
path = _path_from_file_uri(uri)
|
path = _path_from_file_uri(uri)
|
||||||
|
|
||||||
if path is None:
|
if path is None:
|
||||||
return _format_resource_text(
|
return [{
|
||||||
uri=uri,
|
"type": "text",
|
||||||
name=name,
|
"text": _format_resource_text(
|
||||||
title=title,
|
uri=uri,
|
||||||
body="[Resource link only; Hermes cannot read non-file ACP resource URIs directly.]",
|
name=name,
|
||||||
)
|
title=title,
|
||||||
|
body="[Resource link only; Hermes cannot read non-file ACP resource URIs directly.]",
|
||||||
|
),
|
||||||
|
}]
|
||||||
|
|
||||||
|
# Image files: emit a short text header + image_url data URL so vision
|
||||||
|
# models can see the attachment instead of a "binary omitted" note.
|
||||||
|
image_mime = mime_type if _is_image_resource(mime_type) else _guess_image_mime_from_path(path)
|
||||||
|
if image_mime and _is_image_resource(image_mime):
|
||||||
|
try:
|
||||||
|
size = path.stat().st_size
|
||||||
|
if size > _MAX_ACP_RESOURCE_BYTES:
|
||||||
|
return [{
|
||||||
|
"type": "text",
|
||||||
|
"text": _format_resource_text(
|
||||||
|
uri=uri,
|
||||||
|
name=name,
|
||||||
|
title=title,
|
||||||
|
body=f"[Image too large to inline: {size} bytes, cap={_MAX_ACP_RESOURCE_BYTES}]",
|
||||||
|
),
|
||||||
|
}]
|
||||||
|
with path.open("rb") as fh:
|
||||||
|
data = fh.read()
|
||||||
|
except OSError as exc:
|
||||||
|
logger.warning("ACP image resource read failed: %s", uri, exc_info=True)
|
||||||
|
return [{
|
||||||
|
"type": "text",
|
||||||
|
"text": _format_resource_text(
|
||||||
|
uri=uri,
|
||||||
|
name=name,
|
||||||
|
title=title,
|
||||||
|
body=f"[Could not read attached image: {exc}]",
|
||||||
|
),
|
||||||
|
}]
|
||||||
|
display = _resource_display_name(uri, name=name, title=title)
|
||||||
|
return [
|
||||||
|
{"type": "text", "text": f"[Attached image: {display}]\nURI: {uri}"},
|
||||||
|
{"type": "image_url", "image_url": {"url": _image_data_url(data, image_mime)}},
|
||||||
|
]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
size = path.stat().st_size
|
size = path.stat().st_size
|
||||||
|
|
@ -211,36 +278,45 @@ def _resource_link_to_text(block: ResourceContentBlock) -> str | None:
|
||||||
data = fh.read(read_size)
|
data = fh.read(read_size)
|
||||||
text = _decode_text_bytes(data, mime_type)
|
text = _decode_text_bytes(data, mime_type)
|
||||||
if text is None:
|
if text is None:
|
||||||
return _format_resource_text(
|
return [{
|
||||||
uri=uri,
|
"type": "text",
|
||||||
name=name,
|
"text": _format_resource_text(
|
||||||
title=title,
|
uri=uri,
|
||||||
body=f"[Binary file omitted: {size} bytes, mime={mime_type or 'unknown'}]",
|
name=name,
|
||||||
)
|
title=title,
|
||||||
|
body=f"[Binary file omitted: {size} bytes, mime={mime_type or 'unknown'}]",
|
||||||
|
),
|
||||||
|
}]
|
||||||
note = None
|
note = None
|
||||||
if size > _MAX_ACP_RESOURCE_BYTES:
|
if size > _MAX_ACP_RESOURCE_BYTES:
|
||||||
note = f"truncated to {_MAX_ACP_RESOURCE_BYTES} of {size} bytes"
|
note = f"truncated to {_MAX_ACP_RESOURCE_BYTES} of {size} bytes"
|
||||||
return _format_resource_text(uri=uri, name=name, title=title, body=text, note=note)
|
return [{
|
||||||
|
"type": "text",
|
||||||
|
"text": _format_resource_text(uri=uri, name=name, title=title, body=text, note=note),
|
||||||
|
}]
|
||||||
except OSError as exc:
|
except OSError as exc:
|
||||||
logger.warning("ACP resource read failed: %s", uri, exc_info=True)
|
logger.warning("ACP resource read failed: %s", uri, exc_info=True)
|
||||||
return _format_resource_text(
|
return [{
|
||||||
uri=uri,
|
"type": "text",
|
||||||
name=name,
|
"text": _format_resource_text(
|
||||||
title=title,
|
uri=uri,
|
||||||
body=f"[Could not read attached file: {exc}]",
|
name=name,
|
||||||
)
|
title=title,
|
||||||
|
body=f"[Could not read attached file: {exc}]",
|
||||||
|
),
|
||||||
|
}]
|
||||||
|
|
||||||
|
|
||||||
def _embedded_resource_to_text(block: EmbeddedResourceContentBlock) -> str | None:
|
def _embedded_resource_to_parts(block: EmbeddedResourceContentBlock) -> list[dict[str, Any]]:
|
||||||
resource = getattr(block, "resource", None)
|
resource = getattr(block, "resource", None)
|
||||||
if resource is None:
|
if resource is None:
|
||||||
return None
|
return []
|
||||||
|
|
||||||
uri = str(getattr(resource, "uri", "") or "").strip()
|
uri = str(getattr(resource, "uri", "") or "").strip()
|
||||||
mime_type = str(getattr(resource, "mime_type", "") or "").strip() or None
|
mime_type = str(getattr(resource, "mime_type", "") or "").strip() or None
|
||||||
|
|
||||||
if isinstance(resource, TextResourceContents):
|
if isinstance(resource, TextResourceContents):
|
||||||
return _format_resource_text(uri=uri, body=resource.text)
|
return [{"type": "text", "text": _format_resource_text(uri=uri, body=resource.text)}]
|
||||||
|
|
||||||
if isinstance(resource, BlobResourceContents):
|
if isinstance(resource, BlobResourceContents):
|
||||||
blob = resource.blob or ""
|
blob = resource.blob or ""
|
||||||
|
|
@ -248,6 +324,23 @@ def _embedded_resource_to_text(block: EmbeddedResourceContentBlock) -> str | Non
|
||||||
data = base64.b64decode(blob, validate=True)
|
data = base64.b64decode(blob, validate=True)
|
||||||
except Exception:
|
except Exception:
|
||||||
data = blob.encode("utf-8", errors="replace")
|
data = blob.encode("utf-8", errors="replace")
|
||||||
|
|
||||||
|
# Image blobs go through as image_url so vision models can see them.
|
||||||
|
if _is_image_resource(mime_type):
|
||||||
|
if len(data) > _MAX_ACP_RESOURCE_BYTES:
|
||||||
|
return [{
|
||||||
|
"type": "text",
|
||||||
|
"text": _format_resource_text(
|
||||||
|
uri=uri,
|
||||||
|
body=f"[Embedded image too large to inline: {len(data)} bytes, cap={_MAX_ACP_RESOURCE_BYTES}]",
|
||||||
|
),
|
||||||
|
}]
|
||||||
|
display = _resource_display_name(uri)
|
||||||
|
return [
|
||||||
|
{"type": "text", "text": f"[Attached image: {display}]" + (f"\nURI: {uri}" if uri else "")},
|
||||||
|
{"type": "image_url", "image_url": {"url": _image_data_url(data, mime_type or "image/png")}},
|
||||||
|
]
|
||||||
|
|
||||||
text = _decode_text_bytes(data[:_MAX_ACP_RESOURCE_BYTES], mime_type)
|
text = _decode_text_bytes(data[:_MAX_ACP_RESOURCE_BYTES], mime_type)
|
||||||
if text is None:
|
if text is None:
|
||||||
body = f"[Binary embedded file omitted: {len(data)} bytes, mime={mime_type or 'unknown'}]"
|
body = f"[Binary embedded file omitted: {len(data)} bytes, mime={mime_type or 'unknown'}]"
|
||||||
|
|
@ -255,12 +348,12 @@ def _embedded_resource_to_text(block: EmbeddedResourceContentBlock) -> str | Non
|
||||||
body = text
|
body = text
|
||||||
if len(data) > _MAX_ACP_RESOURCE_BYTES:
|
if len(data) > _MAX_ACP_RESOURCE_BYTES:
|
||||||
body += f"\n\n[Truncated to {_MAX_ACP_RESOURCE_BYTES} of {len(data)} bytes]"
|
body += f"\n\n[Truncated to {_MAX_ACP_RESOURCE_BYTES} of {len(data)} bytes]"
|
||||||
return _format_resource_text(uri=uri, body=body)
|
return [{"type": "text", "text": _format_resource_text(uri=uri, body=body)}]
|
||||||
|
|
||||||
text = getattr(resource, "text", None)
|
text = getattr(resource, "text", None)
|
||||||
if text:
|
if text:
|
||||||
return _format_resource_text(uri=uri, body=str(text))
|
return [{"type": "text", "text": _format_resource_text(uri=uri, body=str(text))}]
|
||||||
return None
|
return []
|
||||||
|
|
||||||
|
|
||||||
def _extract_text(
|
def _extract_text(
|
||||||
|
|
@ -323,16 +416,18 @@ def _content_blocks_to_openai_user_content(
|
||||||
parts.append(image_part)
|
parts.append(image_part)
|
||||||
continue
|
continue
|
||||||
if isinstance(block, ResourceContentBlock):
|
if isinstance(block, ResourceContentBlock):
|
||||||
resource_text = _resource_link_to_text(block)
|
resource_parts = _resource_link_to_parts(block)
|
||||||
if resource_text:
|
for part in resource_parts:
|
||||||
parts.append({"type": "text", "text": resource_text})
|
parts.append(part)
|
||||||
text_parts.append(resource_text)
|
if part.get("type") == "text":
|
||||||
|
text_parts.append(part["text"])
|
||||||
continue
|
continue
|
||||||
if isinstance(block, EmbeddedResourceContentBlock):
|
if isinstance(block, EmbeddedResourceContentBlock):
|
||||||
resource_text = _embedded_resource_to_text(block)
|
resource_parts = _embedded_resource_to_parts(block)
|
||||||
if resource_text:
|
for part in resource_parts:
|
||||||
parts.append({"type": "text", "text": resource_text})
|
parts.append(part)
|
||||||
text_parts.append(resource_text)
|
if part.get("type") == "text":
|
||||||
|
text_parts.append(part["text"])
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if not parts:
|
if not parts:
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,8 @@
|
||||||
|
import base64
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from acp.schema import (
|
from acp.schema import (
|
||||||
|
BlobResourceContents,
|
||||||
EmbeddedResourceContentBlock,
|
EmbeddedResourceContentBlock,
|
||||||
ImageContentBlock,
|
ImageContentBlock,
|
||||||
ResourceContentBlock,
|
ResourceContentBlock,
|
||||||
|
|
@ -82,3 +85,75 @@ async def test_initialize_advertises_image_prompt_capability():
|
||||||
assert response.agent_capabilities is not None
|
assert response.agent_capabilities is not None
|
||||||
assert response.agent_capabilities.prompt_capabilities is not None
|
assert response.agent_capabilities.prompt_capabilities is not None
|
||||||
assert response.agent_capabilities.prompt_capabilities.image is True
|
assert response.agent_capabilities.prompt_capabilities.image is True
|
||||||
|
|
||||||
|
|
||||||
|
# 1x1 transparent PNG — smallest valid image payload for inlining tests.
|
||||||
|
_ONE_PX_PNG = bytes.fromhex(
|
||||||
|
"89504e470d0a1a0a0000000d49484452000000010000000108060000001f15c4"
|
||||||
|
"890000000a49444154789c6300010000000500010d0a2db40000000049454e44ae426082"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_acp_resource_link_image_file_is_inlined_as_image_url(tmp_path):
|
||||||
|
attached = tmp_path / "shot.png"
|
||||||
|
attached.write_bytes(_ONE_PX_PNG)
|
||||||
|
|
||||||
|
content = _content_blocks_to_openai_user_content([
|
||||||
|
TextContentBlock(type="text", text="Look at this screenshot"),
|
||||||
|
ResourceContentBlock(
|
||||||
|
type="resource_link",
|
||||||
|
name="shot.png",
|
||||||
|
uri=attached.as_uri(),
|
||||||
|
mimeType="image/png",
|
||||||
|
),
|
||||||
|
])
|
||||||
|
|
||||||
|
assert isinstance(content, list)
|
||||||
|
# [user text, image header, image_url]
|
||||||
|
assert content[0] == {"type": "text", "text": "Look at this screenshot"}
|
||||||
|
assert content[1]["type"] == "text"
|
||||||
|
assert "[Attached image: shot.png]" in content[1]["text"]
|
||||||
|
assert content[2]["type"] == "image_url"
|
||||||
|
expected_url = "data:image/png;base64," + base64.b64encode(_ONE_PX_PNG).decode("ascii")
|
||||||
|
assert content[2]["image_url"]["url"] == expected_url
|
||||||
|
|
||||||
|
|
||||||
|
def test_acp_resource_link_image_mime_inferred_from_suffix(tmp_path):
|
||||||
|
"""No mimeType sent — should still be recognised as image by file suffix."""
|
||||||
|
attached = tmp_path / "pic.jpg"
|
||||||
|
attached.write_bytes(_ONE_PX_PNG) # content doesn't matter for the code path
|
||||||
|
|
||||||
|
content = _content_blocks_to_openai_user_content([
|
||||||
|
ResourceContentBlock(
|
||||||
|
type="resource_link",
|
||||||
|
name="pic.jpg",
|
||||||
|
uri=attached.as_uri(),
|
||||||
|
),
|
||||||
|
])
|
||||||
|
|
||||||
|
assert isinstance(content, list)
|
||||||
|
image_parts = [p for p in content if p.get("type") == "image_url"]
|
||||||
|
assert len(image_parts) == 1
|
||||||
|
assert image_parts[0]["image_url"]["url"].startswith("data:image/jpeg;base64,")
|
||||||
|
|
||||||
|
|
||||||
|
def test_acp_embedded_blob_image_is_inlined_as_image_url():
|
||||||
|
b64 = base64.b64encode(_ONE_PX_PNG).decode("ascii")
|
||||||
|
content = _content_blocks_to_openai_user_content([
|
||||||
|
EmbeddedResourceContentBlock(
|
||||||
|
type="resource",
|
||||||
|
resource=BlobResourceContents(
|
||||||
|
uri="file:///tmp/embed.png",
|
||||||
|
mimeType="image/png",
|
||||||
|
blob=b64,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
])
|
||||||
|
|
||||||
|
assert isinstance(content, list)
|
||||||
|
assert content[0]["type"] == "text"
|
||||||
|
assert "[Attached image: embed.png]" in content[0]["text"]
|
||||||
|
assert content[1] == {
|
||||||
|
"type": "image_url",
|
||||||
|
"image_url": {"url": f"data:image/png;base64,{b64}"},
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue