mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-27 11:22:03 +00:00
feat(pets): OpenRouter + Nous Portal image backend
Reference-grounded image provider over the OpenRouter-compatible chat-completions image protocol (Gemini Flash Image et al.). Nous Portal proxies OpenRouter, so one provider serves both — giving pet generation a reference-capable backend beyond OpenAI gpt-image.
This commit is contained in:
parent
32f837add1
commit
3faf768cde
3 changed files with 717 additions and 0 deletions
414
plugins/image_gen/openrouter/__init__.py
Normal file
414
plugins/image_gen/openrouter/__init__.py
Normal file
|
|
@ -0,0 +1,414 @@
|
|||
"""OpenRouter-compatible image generation backend (OpenRouter + Nous Portal).
|
||||
|
||||
Both OpenRouter and the Nous Portal inference endpoint speak the same
|
||||
OpenAI-style ``/chat/completions`` image-generation protocol: send
|
||||
``modalities: ["image", "text"]`` with an image-output model (e.g.
|
||||
``google/gemini-2.5-flash-image``), pass reference images as ``image_url``
|
||||
content parts for grounding, and read the generated images back from
|
||||
``choices[0].message.images[].image_url.url`` (a ``data:image/...;base64`` URI).
|
||||
|
||||
Nous Portal proxies OpenRouter, so one implementation services both — we only
|
||||
swap the resolved ``(base_url, api_key)``. Credentials are resolved through the
|
||||
agent's existing :func:`~hermes_cli.runtime_provider.resolve_runtime_provider`,
|
||||
which already understands OpenRouter's key pool and the Nous OAuth device-code
|
||||
token, so this plugin never reinvents auth.
|
||||
|
||||
Reference grounding is the reason pet sprite generation cares about this
|
||||
backend: each animation row must stay the same character as the chosen base
|
||||
frame, which only works on models that accept image input. Gemini Flash Image
|
||||
("nano-banana") does, so both providers advertise image-to-image support.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import logging
|
||||
import mimetypes
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from agent.image_gen_provider import (
|
||||
DEFAULT_ASPECT_RATIO,
|
||||
ImageGenProvider,
|
||||
error_response,
|
||||
resolve_aspect_ratio,
|
||||
save_b64_image,
|
||||
save_url_image,
|
||||
success_response,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Default image-output model. Gemini 2.5 Flash Image ("nano-banana") is GA on
|
||||
# OpenRouter, accepts reference images for grounding, and honors
|
||||
# ``image_config.aspect_ratio``.
|
||||
DEFAULT_MODEL = "google/gemini-2.5-flash-image"
|
||||
|
||||
# Semantic aspect ratio (the image_gen contract) → OpenRouter's image_config
|
||||
# aspect_ratio strings.
|
||||
_ASPECT_RATIOS = {
|
||||
"square": "1:1",
|
||||
"landscape": "16:9",
|
||||
"portrait": "9:16",
|
||||
}
|
||||
|
||||
# Gemini Flash Image accepts up to 3 input images per prompt; clamp references
|
||||
# so we never overflow the model's limit.
|
||||
_MAX_REFERENCE_IMAGES = 3
|
||||
|
||||
_REQUEST_TIMEOUT = 180.0
|
||||
|
||||
|
||||
def _load_image_gen_config() -> Dict[str, Any]:
|
||||
"""Read the ``image_gen`` section from config.yaml (``{}`` on failure)."""
|
||||
try:
|
||||
from hermes_cli.config import load_config
|
||||
|
||||
cfg = load_config()
|
||||
section = cfg.get("image_gen") if isinstance(cfg, dict) else None
|
||||
return section if isinstance(section, dict) else {}
|
||||
except Exception as exc: # noqa: BLE001 - config is best-effort
|
||||
logger.debug("could not load image_gen config: %s", exc)
|
||||
return {}
|
||||
|
||||
|
||||
def _to_image_url_part(ref: str) -> Optional[str]:
|
||||
"""Turn a reference (local path or http URL) into an ``image_url`` value.
|
||||
|
||||
Remote URLs pass through unchanged; local files are inlined as base64 data
|
||||
URIs so the request is self-contained (the provider endpoint can't reach a
|
||||
path on our disk). Returns ``None`` when the reference can't be read.
|
||||
"""
|
||||
ref = str(ref or "").strip()
|
||||
if not ref:
|
||||
return None
|
||||
if ref.startswith(("http://", "https://", "data:")):
|
||||
return ref
|
||||
path = Path(ref)
|
||||
try:
|
||||
raw = path.read_bytes()
|
||||
except OSError as exc:
|
||||
logger.debug("could not read reference image %s: %s", ref, exc)
|
||||
return None
|
||||
mime = mimetypes.guess_type(path.name)[0] or "image/png"
|
||||
encoded = base64.b64encode(raw).decode("ascii")
|
||||
return f"data:{mime};base64,{encoded}"
|
||||
|
||||
|
||||
def _extract_images(payload: Dict[str, Any]) -> List[str]:
|
||||
"""Pull generated image URLs from a chat-completions response.
|
||||
|
||||
OpenRouter returns generated images under
|
||||
``choices[0].message.images[].image_url.url`` (typically a base64 data URI).
|
||||
"""
|
||||
out: List[str] = []
|
||||
choices = payload.get("choices") if isinstance(payload, dict) else None
|
||||
if not isinstance(choices, list):
|
||||
return out
|
||||
for choice in choices:
|
||||
message = choice.get("message") if isinstance(choice, dict) else None
|
||||
images = message.get("images") if isinstance(message, dict) else None
|
||||
if not isinstance(images, list):
|
||||
continue
|
||||
for image in images:
|
||||
if not isinstance(image, dict):
|
||||
continue
|
||||
image_url = image.get("image_url")
|
||||
url = image_url.get("url") if isinstance(image_url, dict) else None
|
||||
if isinstance(url, str) and url.strip():
|
||||
out.append(url.strip())
|
||||
return out
|
||||
|
||||
|
||||
class OpenRouterCompatImageProvider(ImageGenProvider):
|
||||
"""Image generation over an OpenRouter-compatible chat-completions endpoint.
|
||||
|
||||
Instantiated once per backend (OpenRouter, Nous Portal). The two differ only
|
||||
in which runtime provider supplies ``(base_url, api_key)`` and in the config
|
||||
namespace used for the model override.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
provider_name: str,
|
||||
display_name: str,
|
||||
runtime_name: str,
|
||||
config_key: str,
|
||||
model_env_var: str,
|
||||
setup_schema: Dict[str, Any],
|
||||
) -> None:
|
||||
self._name = provider_name
|
||||
self._display = display_name
|
||||
self._runtime_name = runtime_name
|
||||
self._config_key = config_key
|
||||
self._model_env_var = model_env_var
|
||||
self._setup_schema = setup_schema
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def display_name(self) -> str:
|
||||
return self._display
|
||||
|
||||
def _resolve_runtime(self) -> Dict[str, Any]:
|
||||
"""Resolve ``(base_url, api_key)`` via the shared runtime resolver."""
|
||||
from hermes_cli.runtime_provider import resolve_runtime_provider
|
||||
|
||||
return resolve_runtime_provider(requested=self._runtime_name)
|
||||
|
||||
def is_available(self) -> bool:
|
||||
try:
|
||||
runtime = self._resolve_runtime()
|
||||
except Exception as exc: # noqa: BLE001 - treat resolution failure as unavailable
|
||||
logger.debug("%s runtime resolution failed: %s", self._name, exc)
|
||||
return False
|
||||
return bool(str(runtime.get("api_key") or "").strip())
|
||||
|
||||
def capabilities(self) -> Dict[str, Any]:
|
||||
# Both text-to-image and image-to-image (reference grounding) — the
|
||||
# latter is what makes this backend usable for pet sprite rows.
|
||||
return {
|
||||
"modalities": ["text", "image"],
|
||||
"max_reference_images": _MAX_REFERENCE_IMAGES,
|
||||
}
|
||||
|
||||
def list_models(self) -> List[Dict[str, Any]]:
|
||||
return [
|
||||
{
|
||||
"id": DEFAULT_MODEL,
|
||||
"display": "Gemini 2.5 Flash Image (nano-banana)",
|
||||
"strengths": "Reference-grounded edits; aspect-ratio control",
|
||||
}
|
||||
]
|
||||
|
||||
def default_model(self) -> Optional[str]:
|
||||
return self._resolve_model()
|
||||
|
||||
def get_setup_schema(self) -> Dict[str, Any]:
|
||||
return dict(self._setup_schema)
|
||||
|
||||
def _resolve_model(self) -> str:
|
||||
"""Pick the image model: env override → config → :data:`DEFAULT_MODEL`."""
|
||||
env_override = os.environ.get(self._model_env_var, "").strip()
|
||||
if env_override:
|
||||
return env_override
|
||||
cfg = _load_image_gen_config()
|
||||
scoped = cfg.get(self._config_key) if isinstance(cfg.get(self._config_key), dict) else {}
|
||||
if isinstance(scoped, dict):
|
||||
value = scoped.get("model")
|
||||
if isinstance(value, str) and value.strip():
|
||||
return value.strip()
|
||||
return DEFAULT_MODEL
|
||||
|
||||
def generate(
|
||||
self,
|
||||
prompt: str,
|
||||
aspect_ratio: str = DEFAULT_ASPECT_RATIO,
|
||||
*,
|
||||
image_url: Optional[str] = None,
|
||||
reference_image_urls: Optional[List[str]] = None,
|
||||
**kwargs: Any,
|
||||
) -> Dict[str, Any]:
|
||||
import requests
|
||||
|
||||
try:
|
||||
runtime = self._resolve_runtime()
|
||||
except Exception as exc: # noqa: BLE001
|
||||
return error_response(
|
||||
error=f"Could not resolve {self._display} credentials: {exc}",
|
||||
error_type="missing_api_key",
|
||||
provider=self._name,
|
||||
aspect_ratio=aspect_ratio,
|
||||
)
|
||||
api_key = str(runtime.get("api_key") or "").strip()
|
||||
base_url = str(runtime.get("base_url") or "").strip().rstrip("/")
|
||||
if not api_key or not base_url:
|
||||
return error_response(
|
||||
error=(
|
||||
f"No {self._display} credentials found. "
|
||||
f"Configure {self._display} in `hermes tools` → Image Generation."
|
||||
),
|
||||
error_type="missing_api_key",
|
||||
provider=self._name,
|
||||
aspect_ratio=aspect_ratio,
|
||||
)
|
||||
|
||||
model_id = self._resolve_model()
|
||||
aspect = resolve_aspect_ratio(aspect_ratio)
|
||||
or_aspect = _ASPECT_RATIOS.get(aspect, "1:1")
|
||||
|
||||
# Collect every reference: the pet generator passes local paths via the
|
||||
# ``reference_images`` kwarg; the generic tool surface uses ``image_url``
|
||||
# / ``reference_image_urls``. Accept all three.
|
||||
references: List[str] = []
|
||||
for ref in kwargs.get("reference_images") or []:
|
||||
references.append(str(ref))
|
||||
if image_url:
|
||||
references.append(str(image_url))
|
||||
for ref in reference_image_urls or []:
|
||||
references.append(str(ref))
|
||||
|
||||
content: List[Dict[str, Any]] = [{"type": "text", "text": prompt}]
|
||||
for ref in references[:_MAX_REFERENCE_IMAGES]:
|
||||
part = _to_image_url_part(ref)
|
||||
if part:
|
||||
content.append({"type": "image_url", "image_url": {"url": part}})
|
||||
|
||||
payload: Dict[str, Any] = {
|
||||
"model": model_id,
|
||||
"modalities": ["image", "text"],
|
||||
"messages": [{"role": "user", "content": content}],
|
||||
"image_config": {"aspect_ratio": or_aspect},
|
||||
}
|
||||
headers = {
|
||||
"Authorization": f"Bearer {api_key}",
|
||||
"Content-Type": "application/json",
|
||||
# OpenRouter attribution headers (harmless against Nous Portal).
|
||||
"HTTP-Referer": "https://github.com/NousResearch/hermes-agent",
|
||||
"X-Title": "Hermes Agent",
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(
|
||||
f"{base_url}/chat/completions",
|
||||
headers=headers,
|
||||
json=payload,
|
||||
timeout=_REQUEST_TIMEOUT,
|
||||
)
|
||||
response.raise_for_status()
|
||||
except requests.HTTPError as exc:
|
||||
resp = exc.response
|
||||
status = resp.status_code if resp is not None else 0
|
||||
try:
|
||||
err_msg = resp.json().get("error", {}).get("message", resp.text[:300])
|
||||
except Exception: # noqa: BLE001
|
||||
err_msg = resp.text[:300] if resp is not None else str(exc)
|
||||
logger.error("%s image gen failed (%d): %s", self._name, status, err_msg)
|
||||
return error_response(
|
||||
error=f"{self._display} image generation failed ({status}): {err_msg}",
|
||||
error_type="api_error",
|
||||
provider=self._name,
|
||||
model=model_id,
|
||||
prompt=prompt,
|
||||
aspect_ratio=aspect,
|
||||
)
|
||||
except requests.Timeout:
|
||||
return error_response(
|
||||
error=f"{self._display} image generation timed out "
|
||||
f"({int(_REQUEST_TIMEOUT)}s)",
|
||||
error_type="timeout",
|
||||
provider=self._name,
|
||||
model=model_id,
|
||||
prompt=prompt,
|
||||
aspect_ratio=aspect,
|
||||
)
|
||||
except requests.ConnectionError as exc:
|
||||
return error_response(
|
||||
error=f"{self._display} connection error: {exc}",
|
||||
error_type="connection_error",
|
||||
provider=self._name,
|
||||
model=model_id,
|
||||
prompt=prompt,
|
||||
aspect_ratio=aspect,
|
||||
)
|
||||
|
||||
try:
|
||||
result = response.json()
|
||||
except Exception as exc: # noqa: BLE001
|
||||
return error_response(
|
||||
error=f"{self._display} returned invalid JSON: {exc}",
|
||||
error_type="invalid_response",
|
||||
provider=self._name,
|
||||
model=model_id,
|
||||
prompt=prompt,
|
||||
aspect_ratio=aspect,
|
||||
)
|
||||
|
||||
images = _extract_images(result)
|
||||
if not images:
|
||||
# A response with text but no image usually means the model didn't
|
||||
# honor image output (wrong model or modalities); surface that.
|
||||
return error_response(
|
||||
error=(
|
||||
f"{self._display} returned no image. Ensure the model "
|
||||
f"'{model_id}' supports image output."
|
||||
),
|
||||
error_type="empty_response",
|
||||
provider=self._name,
|
||||
model=model_id,
|
||||
prompt=prompt,
|
||||
aspect_ratio=aspect,
|
||||
)
|
||||
|
||||
first = images[0]
|
||||
try:
|
||||
if first.startswith("data:"):
|
||||
b64 = first.split(",", 1)[1] if "," in first else ""
|
||||
saved_path = save_b64_image(b64, prefix=f"{self._name}_gen")
|
||||
else:
|
||||
saved_path = save_url_image(first, prefix=f"{self._name}_gen")
|
||||
except Exception as exc: # noqa: BLE001
|
||||
return error_response(
|
||||
error=f"Could not save generated image: {exc}",
|
||||
error_type="io_error",
|
||||
provider=self._name,
|
||||
model=model_id,
|
||||
prompt=prompt,
|
||||
aspect_ratio=aspect,
|
||||
)
|
||||
|
||||
return success_response(
|
||||
image=str(saved_path),
|
||||
model=model_id,
|
||||
prompt=prompt,
|
||||
aspect_ratio=aspect,
|
||||
provider=self._name,
|
||||
)
|
||||
|
||||
|
||||
def _build_providers() -> List[OpenRouterCompatImageProvider]:
|
||||
return [
|
||||
OpenRouterCompatImageProvider(
|
||||
provider_name="openrouter",
|
||||
display_name="OpenRouter",
|
||||
runtime_name="openrouter",
|
||||
config_key="openrouter",
|
||||
model_env_var="OPENROUTER_IMAGE_MODEL",
|
||||
setup_schema={
|
||||
"name": "OpenRouter (image)",
|
||||
"badge": "paid",
|
||||
"tag": "Gemini Flash Image & more via OpenRouter; uses OPENROUTER_API_KEY",
|
||||
"env_vars": [
|
||||
{
|
||||
"key": "OPENROUTER_API_KEY",
|
||||
"prompt": "OpenRouter API key",
|
||||
"url": "https://openrouter.ai/keys",
|
||||
}
|
||||
],
|
||||
},
|
||||
),
|
||||
OpenRouterCompatImageProvider(
|
||||
provider_name="nous",
|
||||
display_name="Nous Portal",
|
||||
runtime_name="nous",
|
||||
config_key="nous",
|
||||
model_env_var="NOUS_IMAGE_MODEL",
|
||||
setup_schema={
|
||||
"name": "Nous Portal (image)",
|
||||
"badge": "subscription",
|
||||
"tag": "Reference-grounded image generation via Nous Portal (OpenRouter-backed)",
|
||||
"env_vars": [],
|
||||
"requires_nous_auth": True,
|
||||
},
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def register(ctx: Any) -> None:
|
||||
"""Register the OpenRouter + Nous Portal image gen providers."""
|
||||
for provider in _build_providers():
|
||||
ctx.register_image_gen_provider(provider)
|
||||
7
plugins/image_gen/openrouter/plugin.yaml
Normal file
7
plugins/image_gen/openrouter/plugin.yaml
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
name: openrouter
|
||||
version: 1.0.0
|
||||
description: "OpenRouter + Nous Portal image generation (chat-completions image output; reference-grounded). Text-to-image and image-to-image."
|
||||
author: Hermes Agent
|
||||
kind: backend
|
||||
requires_env:
|
||||
- OPENROUTER_API_KEY
|
||||
296
tests/plugins/image_gen/test_openrouter_compat_provider.py
Normal file
296
tests/plugins/image_gen/test_openrouter_compat_provider.py
Normal file
|
|
@ -0,0 +1,296 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Tests for the OpenRouter-compatible image gen provider (OpenRouter + Nous)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
_RUNTIME = "hermes_cli.runtime_provider.resolve_runtime_provider"
|
||||
_PNG_DATA_URI = "data:image/png;base64,dGVzdC1pbWFnZS1kYXRh" # "test-image-data"
|
||||
|
||||
|
||||
def _runtime_ok(**over):
|
||||
base = {
|
||||
"provider": "openrouter",
|
||||
"api_mode": "chat_completions",
|
||||
"base_url": "https://openrouter.ai/api/v1",
|
||||
"api_key": "sk-or-test",
|
||||
"source": "env",
|
||||
}
|
||||
base.update(over)
|
||||
return base
|
||||
|
||||
|
||||
def _mock_chat_response(images):
|
||||
resp = MagicMock()
|
||||
resp.status_code = 200
|
||||
resp.raise_for_status = MagicMock()
|
||||
resp.json.return_value = {
|
||||
"choices": [
|
||||
{
|
||||
"message": {
|
||||
"role": "assistant",
|
||||
"content": "",
|
||||
"images": [
|
||||
{"type": "image_url", "image_url": {"url": u}} for u in images
|
||||
],
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
return resp
|
||||
|
||||
|
||||
def _openrouter():
|
||||
from plugins.image_gen.openrouter import OpenRouterCompatImageProvider
|
||||
|
||||
return OpenRouterCompatImageProvider(
|
||||
provider_name="openrouter",
|
||||
display_name="OpenRouter",
|
||||
runtime_name="openrouter",
|
||||
config_key="openrouter",
|
||||
model_env_var="OPENROUTER_IMAGE_MODEL",
|
||||
setup_schema={"name": "OpenRouter (image)", "badge": "paid", "env_vars": []},
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Provider class
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestProviderClass:
|
||||
def test_names(self):
|
||||
from plugins.image_gen.openrouter import _build_providers
|
||||
|
||||
names = {p.name for p in _build_providers()}
|
||||
assert names == {"openrouter", "nous"}
|
||||
|
||||
def test_display_names(self):
|
||||
from plugins.image_gen.openrouter import _build_providers
|
||||
|
||||
by_name = {p.name: p for p in _build_providers()}
|
||||
assert by_name["openrouter"].display_name == "OpenRouter"
|
||||
assert by_name["nous"].display_name == "Nous Portal"
|
||||
|
||||
def test_capabilities_support_image_input(self):
|
||||
caps = _openrouter().capabilities()
|
||||
assert "image" in caps["modalities"]
|
||||
assert caps["max_reference_images"] >= 1
|
||||
|
||||
def test_is_available_with_key(self):
|
||||
with patch(_RUNTIME, return_value=_runtime_ok()):
|
||||
assert _openrouter().is_available() is True
|
||||
|
||||
def test_is_available_without_key(self):
|
||||
with patch(_RUNTIME, return_value=_runtime_ok(api_key="")):
|
||||
assert _openrouter().is_available() is False
|
||||
|
||||
def test_is_available_on_resolution_error(self):
|
||||
with patch(_RUNTIME, side_effect=RuntimeError("boom")):
|
||||
assert _openrouter().is_available() is False
|
||||
|
||||
def test_default_model(self):
|
||||
from plugins.image_gen.openrouter import DEFAULT_MODEL
|
||||
|
||||
with patch("plugins.image_gen.openrouter._load_image_gen_config", return_value={}):
|
||||
assert _openrouter().default_model() == DEFAULT_MODEL
|
||||
assert DEFAULT_MODEL == "google/gemini-2.5-flash-image"
|
||||
|
||||
def test_model_env_override(self, monkeypatch):
|
||||
monkeypatch.setenv("OPENROUTER_IMAGE_MODEL", "black-forest-labs/flux.2-pro")
|
||||
assert _openrouter()._resolve_model() == "black-forest-labs/flux.2-pro"
|
||||
|
||||
def test_model_config_override(self):
|
||||
cfg = {"openrouter": {"model": "google/gemini-3.1-flash-image-preview"}}
|
||||
with patch("plugins.image_gen.openrouter._load_image_gen_config", return_value=cfg):
|
||||
assert _openrouter()._resolve_model() == "google/gemini-3.1-flash-image-preview"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestHelpers:
|
||||
def test_to_image_url_part_passthrough_url(self):
|
||||
from plugins.image_gen.openrouter import _to_image_url_part
|
||||
|
||||
assert _to_image_url_part("https://x/y.png") == "https://x/y.png"
|
||||
assert _to_image_url_part("data:image/png;base64,AAAA") == "data:image/png;base64,AAAA"
|
||||
|
||||
def test_to_image_url_part_inlines_local_file(self, tmp_path):
|
||||
from plugins.image_gen.openrouter import _to_image_url_part
|
||||
|
||||
f = tmp_path / "base.png"
|
||||
f.write_bytes(b"\x89PNG\r\n")
|
||||
part = _to_image_url_part(str(f))
|
||||
assert part.startswith("data:image/png;base64,")
|
||||
decoded = base64.b64decode(part.split(",", 1)[1])
|
||||
assert decoded == b"\x89PNG\r\n"
|
||||
|
||||
def test_to_image_url_part_missing_file(self):
|
||||
from plugins.image_gen.openrouter import _to_image_url_part
|
||||
|
||||
assert _to_image_url_part("/no/such/file.png") is None
|
||||
|
||||
def test_extract_images(self):
|
||||
from plugins.image_gen.openrouter import _extract_images
|
||||
|
||||
payload = {
|
||||
"choices": [
|
||||
{"message": {"images": [{"image_url": {"url": "data:image/png;base64,AA"}}]}}
|
||||
]
|
||||
}
|
||||
assert _extract_images(payload) == ["data:image/png;base64,AA"]
|
||||
|
||||
def test_extract_images_empty(self):
|
||||
from plugins.image_gen.openrouter import _extract_images
|
||||
|
||||
assert _extract_images({"choices": [{"message": {"content": "no image"}}]}) == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# generate()
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGenerate:
|
||||
def test_missing_credentials(self):
|
||||
with patch(_RUNTIME, return_value=_runtime_ok(api_key="")):
|
||||
result = _openrouter().generate(prompt="a pet")
|
||||
assert result["success"] is False
|
||||
assert result["error_type"] == "missing_api_key"
|
||||
|
||||
def test_success_data_uri(self):
|
||||
with patch(_RUNTIME, return_value=_runtime_ok()), \
|
||||
patch("requests.post", return_value=_mock_chat_response([_PNG_DATA_URI])), \
|
||||
patch(
|
||||
"plugins.image_gen.openrouter.save_b64_image",
|
||||
return_value=Path("/tmp/openrouter_gen.png"),
|
||||
) as mock_save:
|
||||
result = _openrouter().generate(prompt="a pet")
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["image"] == "/tmp/openrouter_gen.png"
|
||||
assert result["provider"] == "openrouter"
|
||||
mock_save.assert_called_once()
|
||||
|
||||
def test_success_http_url(self):
|
||||
with patch(_RUNTIME, return_value=_runtime_ok()), \
|
||||
patch("requests.post", return_value=_mock_chat_response(["https://cdn/x.png"])), \
|
||||
patch(
|
||||
"plugins.image_gen.openrouter.save_url_image",
|
||||
return_value=Path("/tmp/openrouter_gen_url.png"),
|
||||
) as mock_save_url:
|
||||
result = _openrouter().generate(prompt="a pet")
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["image"] == "/tmp/openrouter_gen_url.png"
|
||||
mock_save_url.assert_called_once()
|
||||
|
||||
def test_empty_response(self):
|
||||
with patch(_RUNTIME, return_value=_runtime_ok()), \
|
||||
patch("requests.post", return_value=_mock_chat_response([])):
|
||||
result = _openrouter().generate(prompt="a pet")
|
||||
assert result["success"] is False
|
||||
assert result["error_type"] == "empty_response"
|
||||
|
||||
def test_payload_shape_and_references(self, tmp_path):
|
||||
"""Wire payload must carry image modalities, aspect_ratio, and the
|
||||
reference image inlined as a data URI (this is what makes pet rows
|
||||
stay on-model)."""
|
||||
ref = tmp_path / "base.png"
|
||||
ref.write_bytes(b"\x89PNG\r\n")
|
||||
|
||||
with patch(_RUNTIME, return_value=_runtime_ok()), \
|
||||
patch("requests.post", return_value=_mock_chat_response([_PNG_DATA_URI])) as mock_post, \
|
||||
patch("plugins.image_gen.openrouter.save_b64_image", return_value=Path("/tmp/x.png")):
|
||||
_openrouter().generate(
|
||||
prompt="a pet", aspect_ratio="square", reference_images=[str(ref)]
|
||||
)
|
||||
|
||||
payload = mock_post.call_args.kwargs["json"]
|
||||
assert payload["modalities"] == ["image", "text"]
|
||||
assert payload["image_config"]["aspect_ratio"] == "1:1"
|
||||
content = payload["messages"][0]["content"]
|
||||
assert content[0] == {"type": "text", "text": "a pet"}
|
||||
image_parts = [c for c in content if c["type"] == "image_url"]
|
||||
assert len(image_parts) == 1
|
||||
assert image_parts[0]["image_url"]["url"].startswith("data:image/png;base64,")
|
||||
|
||||
def test_auth_header(self):
|
||||
with patch(_RUNTIME, return_value=_runtime_ok()), \
|
||||
patch("requests.post", return_value=_mock_chat_response([_PNG_DATA_URI])) as mock_post, \
|
||||
patch("plugins.image_gen.openrouter.save_b64_image", return_value=Path("/tmp/x.png")):
|
||||
_openrouter().generate(prompt="a pet")
|
||||
|
||||
headers = mock_post.call_args.kwargs["headers"]
|
||||
assert headers["Authorization"] == "Bearer sk-or-test"
|
||||
|
||||
def test_posts_to_resolved_base_url(self):
|
||||
"""Nous routes to its own base URL — proves the same code serves both."""
|
||||
nous_runtime = _runtime_ok(
|
||||
provider="nous", base_url="https://inference.nousresearch.com/v1", api_key="nous-tok"
|
||||
)
|
||||
with patch(_RUNTIME, return_value=nous_runtime), \
|
||||
patch("requests.post", return_value=_mock_chat_response([_PNG_DATA_URI])) as mock_post, \
|
||||
patch("plugins.image_gen.openrouter.save_b64_image", return_value=Path("/tmp/x.png")):
|
||||
from plugins.image_gen.openrouter import _build_providers
|
||||
|
||||
nous = {p.name: p for p in _build_providers()}["nous"]
|
||||
result = nous.generate(prompt="a pet")
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["provider"] == "nous"
|
||||
url = mock_post.call_args[0][0]
|
||||
assert url == "https://inference.nousresearch.com/v1/chat/completions"
|
||||
|
||||
def test_api_error(self):
|
||||
import requests as req_lib
|
||||
|
||||
resp = MagicMock()
|
||||
resp.status_code = 401
|
||||
resp.text = "Unauthorized"
|
||||
resp.json.return_value = {"error": {"message": "Invalid API key"}}
|
||||
resp.raise_for_status.side_effect = req_lib.HTTPError(response=resp)
|
||||
|
||||
with patch(_RUNTIME, return_value=_runtime_ok()), \
|
||||
patch("requests.post", return_value=resp):
|
||||
result = _openrouter().generate(prompt="a pet")
|
||||
assert result["success"] is False
|
||||
assert result["error_type"] == "api_error"
|
||||
|
||||
def test_timeout(self):
|
||||
import requests as req_lib
|
||||
|
||||
with patch(_RUNTIME, return_value=_runtime_ok()), \
|
||||
patch("requests.post", side_effect=req_lib.Timeout()):
|
||||
result = _openrouter().generate(prompt="a pet")
|
||||
assert result["success"] is False
|
||||
assert result["error_type"] == "timeout"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Registration + pet integration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRegistration:
|
||||
def test_register_both(self):
|
||||
from plugins.image_gen.openrouter import register
|
||||
|
||||
ctx = MagicMock()
|
||||
register(ctx)
|
||||
registered = [c.args[0].name for c in ctx.register_image_gen_provider.call_args_list]
|
||||
assert set(registered) == {"openrouter", "nous"}
|
||||
|
||||
def test_both_are_reference_capable_for_pets(self):
|
||||
from agent.pet.generate.imagegen import _REF_CAPABLE
|
||||
|
||||
assert "openrouter" in _REF_CAPABLE
|
||||
assert "nous" in _REF_CAPABLE
|
||||
Loading…
Add table
Add a link
Reference in a new issue