diff --git a/plugins/image_gen/openai-codex/__init__.py b/plugins/image_gen/openai-codex/__init__.py index ab524dbdd75..339e390be1f 100644 --- a/plugins/image_gen/openai-codex/__init__.py +++ b/plugins/image_gen/openai-codex/__init__.py @@ -19,6 +19,7 @@ Output is saved as PNG under ``$HERMES_HOME/cache/images/``. from __future__ import annotations +import json import logging from typing import Any, Dict, List, Optional, Tuple @@ -142,39 +143,18 @@ def _read_codex_access_token() -> Optional[str]: return None -def _build_codex_client(): - """Return an OpenAI client pointed at the ChatGPT/Codex backend, or None.""" - token = _read_codex_access_token() - if not token: - return None - try: - import openai - from agent.auxiliary_client import _codex_cloudflare_headers - - return openai.OpenAI( - api_key=token, - base_url=_CODEX_BASE_URL, - default_headers=_codex_cloudflare_headers(token), - ) - except Exception as exc: - logger.debug("Could not build Codex image client: %s", exc) - return None - - -def _collect_image_b64(client: Any, *, prompt: str, size: str, quality: str) -> Optional[str]: - """Stream a Codex Responses image_generation call and return the b64 image.""" - image_b64: Optional[str] = None - - with client.responses.stream( - model=_CODEX_CHAT_MODEL, - store=False, - instructions=_CODEX_INSTRUCTIONS, - input=[{ +def _build_responses_payload(*, prompt: str, size: str, quality: str) -> Dict[str, Any]: + """Build the Codex Responses request body for an image_generation call.""" + return { + "model": _CODEX_CHAT_MODEL, + "store": False, + "instructions": _CODEX_INSTRUCTIONS, + "input": [{ "type": "message", "role": "user", "content": [{"type": "input_text", "text": prompt}], }], - tools=[{ + "tools": [{ "type": "image_generation", "model": API_MODEL, "size": size, @@ -183,33 +163,114 @@ def _collect_image_b64(client: Any, *, prompt: str, size: str, quality: str) -> "background": "opaque", "partial_images": 1, }], - tool_choice={ + "tool_choice": { "type": "allowed_tools", "mode": "required", "tools": [{"type": "image_generation"}], }, - ) as stream: - for event in stream: - event_type = getattr(event, "type", "") - if event_type == "response.output_item.done": - item = getattr(event, "item", None) - if getattr(item, "type", None) == "image_generation_call": - result = getattr(item, "result", None) - if isinstance(result, str) and result: - image_b64 = result - elif event_type == "response.image_generation_call.partial_image": - partial = getattr(event, "partial_image_b64", None) - if isinstance(partial, str) and partial: - image_b64 = partial - final = stream.get_final_response() + "stream": True, + } - # Final-response sweep covers the case where the stream finished before - # we observed the ``output_item.done`` event for the image call. - for item in getattr(final, "output", None) or []: - if getattr(item, "type", None) == "image_generation_call": - result = getattr(item, "result", None) + +def _extract_image_b64(value: Any) -> Optional[str]: + """Return the newest image b64 embedded in a Responses event payload.""" + found: Optional[str] = None + if isinstance(value, dict): + if value.get("type") == "image_generation_call": + result = value.get("result") if isinstance(result, str) and result: - image_b64 = result + found = result + partial = value.get("partial_image_b64") + if isinstance(partial, str) and partial: + found = partial + for child in value.values(): + nested = _extract_image_b64(child) + if nested: + found = nested + elif isinstance(value, list): + for child in value: + nested = _extract_image_b64(child) + if nested: + found = nested + return found + + +def _iter_sse_json(response: Any): + """Yield JSON payloads from an SSE response without OpenAI SDK parsing. + + The ChatGPT/Codex backend can emit image-generation events newer than the + pinned Python SDK understands. Parsing raw SSE keeps this provider tolerant + of those event-shape changes. + """ + event_name: Optional[str] = None + data_lines: List[str] = [] + + def flush(): + nonlocal event_name, data_lines + if not data_lines: + event_name = None + return None + raw = "\n".join(data_lines).strip() + event = event_name + event_name = None + data_lines = [] + if not raw or raw == "[DONE]": + return None + payload = json.loads(raw) + if isinstance(payload, dict) and event and "type" not in payload: + payload["type"] = event + return payload + + for line in response.iter_lines(): + if isinstance(line, bytes): + line = line.decode("utf-8", errors="replace") + line = str(line) + if line == "": + payload = flush() + if payload is not None: + yield payload + continue + if line.startswith(":"): + continue + if line.startswith("event:"): + event_name = line[len("event:"):].strip() + elif line.startswith("data:"): + data_lines.append(line[len("data:"):].lstrip()) + + payload = flush() + if payload is not None: + yield payload + + +def _collect_image_b64(token: str, *, prompt: str, size: str, quality: str) -> Optional[str]: + """Stream a Codex Responses image_generation call and return the b64 image.""" + import httpx + from agent.auxiliary_client import _codex_cloudflare_headers + + headers = _codex_cloudflare_headers(token) + headers.update({ + "Accept": "text/event-stream", + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + }) + payload = _build_responses_payload(prompt=prompt, size=size, quality=quality) + timeout = httpx.Timeout(300.0, connect=30.0, read=300.0, write=30.0, pool=30.0) + + image_b64: Optional[str] = None + with httpx.Client(timeout=timeout, headers=headers) as http: + with http.stream("POST", f"{_CODEX_BASE_URL}/responses", json=payload) as response: + try: + response.raise_for_status() + except httpx.HTTPStatusError as exc: + exc.response.read() + body = exc.response.text[:500] + raise RuntimeError( + f"Codex Responses API returned HTTP {exc.response.status_code}: {body}" + ) from exc + for event in _iter_sse_json(response): + found = _extract_image_b64(event) + if found: + image_b64 = found return image_b64 @@ -234,7 +295,7 @@ class OpenAICodexImageGenProvider(ImageGenProvider): if not _read_codex_access_token(): return False try: - import openai # noqa: F401 + import httpx # noqa: F401 except ImportError: return False return True @@ -295,10 +356,10 @@ class OpenAICodexImageGenProvider(ImageGenProvider): ) try: - import openai # noqa: F401 + import httpx # noqa: F401 except ImportError: return error_response( - error="openai Python package not installed (pip install openai)", + error="httpx Python package not installed (pip install httpx)", error_type="missing_dependency", provider="openai-codex", aspect_ratio=aspect, @@ -307,10 +368,13 @@ class OpenAICodexImageGenProvider(ImageGenProvider): tier_id, meta = _resolve_model() size = _SIZES.get(aspect, _SIZES["square"]) - client = _build_codex_client() - if client is None: + token = _read_codex_access_token() + if not token: return error_response( - error="Could not initialize Codex image client", + error=( + "No Codex/ChatGPT OAuth credentials available. Run " + "`hermes auth codex` (or `hermes setup` → Codex) to sign in." + ), error_type="auth_required", provider="openai-codex", model=tier_id, @@ -320,7 +384,7 @@ class OpenAICodexImageGenProvider(ImageGenProvider): try: b64 = _collect_image_b64( - client, + token, prompt=prompt, size=size, quality=meta["quality"], diff --git a/tests/plugins/image_gen/test_openai_codex_provider.py b/tests/plugins/image_gen/test_openai_codex_provider.py index 3c8cf86c0a6..2940b300b36 100644 --- a/tests/plugins/image_gen/test_openai_codex_provider.py +++ b/tests/plugins/image_gen/test_openai_codex_provider.py @@ -10,7 +10,6 @@ from __future__ import annotations import importlib from pathlib import Path -from types import SimpleNamespace import pytest @@ -33,24 +32,6 @@ def _b64_png() -> str: return base64.b64encode(bytes.fromhex(_PNG_HEX)).decode() -class _FakeStream: - def __init__(self, events, final_response): - self._events = list(events) - self._final = final_response - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc, tb): - return False - - def __iter__(self): - return iter(self._events) - - def get_final_response(self): - return self._final - - @pytest.fixture(autouse=True) def _tmp_hermes_home(tmp_path, monkeypatch): monkeypatch.setenv("HERMES_HOME", str(tmp_path)) @@ -127,22 +108,7 @@ class TestGenerate: def test_generate_uses_codex_stream_path(self, provider, monkeypatch, tmp_path): monkeypatch.setattr(codex_plugin, "_read_codex_access_token", lambda: "codex-token") - - output_item = SimpleNamespace( - type="image_generation_call", - status="generating", - id="ig_test", - result=_b64_png(), - ) - done_event = SimpleNamespace(type="response.output_item.done", item=output_item) - final_response = SimpleNamespace(output=[], status="completed", output_text="") - - fake_client = SimpleNamespace( - responses=SimpleNamespace( - stream=lambda **kwargs: _FakeStream([done_event], final_response) - ) - ) - monkeypatch.setattr(codex_plugin, "_build_codex_client", lambda: fake_client) + monkeypatch.setattr(codex_plugin, "_collect_image_b64", lambda *a, **kw: _b64_png()) result = provider.generate("a cat", aspect_ratio="landscape") @@ -163,20 +129,15 @@ class TestGenerate: captured = {} - def _stream(**kwargs): - captured.update(kwargs) - output_item = SimpleNamespace( - type="image_generation_call", - status="generating", - id="ig_test", - result=_b64_png(), - ) - done_event = SimpleNamespace(type="response.output_item.done", item=output_item) - final_response = SimpleNamespace(output=[], status="completed", output_text="") - return _FakeStream([done_event], final_response) + def _collect(token, *, prompt, size, quality): + captured.update(codex_plugin._build_responses_payload( + prompt=prompt, + size=size, + quality=quality, + )) + return _b64_png() - fake_client = SimpleNamespace(responses=SimpleNamespace(stream=_stream)) - monkeypatch.setattr(codex_plugin, "_build_codex_client", lambda: fake_client) + monkeypatch.setattr(codex_plugin, "_collect_image_b64", _collect) result = provider.generate("a cat", aspect_ratio="portrait") assert result["success"] is True @@ -199,83 +160,59 @@ class TestGenerate: assert tool["background"] == "opaque" assert tool["partial_images"] == 1 - def test_partial_image_event_used_when_done_missing(self, provider, monkeypatch): - """If the stream never emits output_item.done, fall back to the - partial_image event so users at least get the latest preview frame.""" - monkeypatch.setattr(codex_plugin, "_read_codex_access_token", lambda: "codex-token") + def test_partial_image_event_used_when_done_missing(self): + """If output_item.done is missing, partial_image_b64 is accepted.""" + payload = { + "type": "response.image_generation_call.partial_image", + "partial_image_b64": _b64_png(), + } + assert codex_plugin._extract_image_b64(payload) == _b64_png() - partial_event = SimpleNamespace( - type="response.image_generation_call.partial_image", - partial_image_b64=_b64_png(), - ) - final_response = SimpleNamespace(output=[], status="completed", output_text="") + def test_sse_parser_handles_event_and_data_lines(self): + class _Response: + def iter_lines(self): + return iter([ + "event: response.output_item.done", + 'data: {"item": {"type": "image_generation_call", "result": "abc"}}', + "", + ]) - fake_client = SimpleNamespace( - responses=SimpleNamespace( - stream=lambda **kwargs: _FakeStream([partial_event], final_response) - ) - ) - monkeypatch.setattr(codex_plugin, "_build_codex_client", lambda: fake_client) + events = list(codex_plugin._iter_sse_json(_Response())) + assert events == [{ + "type": "response.output_item.done", + "item": {"type": "image_generation_call", "result": "abc"}, + }] - result = provider.generate("a cat") - assert result["success"] is True - assert Path(result["image"]).exists() - - def test_final_response_sweep_recovers_image(self, provider, monkeypatch): - """If no image_generation_call event arrives mid-stream, the - post-stream final-response sweep should still find the image.""" - monkeypatch.setattr(codex_plugin, "_read_codex_access_token", lambda: "codex-token") - - final_item = SimpleNamespace( - type="image_generation_call", - status="completed", - id="ig_final", - result=_b64_png(), - ) - final_response = SimpleNamespace(output=[final_item], status="completed", output_text="") - - fake_client = SimpleNamespace( - responses=SimpleNamespace( - stream=lambda **kwargs: _FakeStream([], final_response) - ) - ) - monkeypatch.setattr(codex_plugin, "_build_codex_client", lambda: fake_client) - - result = provider.generate("a cat") - assert result["success"] is True - assert Path(result["image"]).exists() + def test_final_response_sweep_recovers_image(self): + """Completed response output is found by recursive payload scanning.""" + payload = { + "type": "response.completed", + "response": { + "output": [{ + "type": "image_generation_call", + "status": "completed", + "id": "ig_final", + "result": _b64_png(), + }], + }, + } + assert codex_plugin._extract_image_b64(payload) == _b64_png() def test_empty_response_returns_error(self, provider, monkeypatch): monkeypatch.setattr(codex_plugin, "_read_codex_access_token", lambda: "codex-token") - - final_response = SimpleNamespace(output=[], status="completed", output_text="") - fake_client = SimpleNamespace( - responses=SimpleNamespace( - stream=lambda **kwargs: _FakeStream([], final_response) - ) - ) - monkeypatch.setattr(codex_plugin, "_build_codex_client", lambda: fake_client) + monkeypatch.setattr(codex_plugin, "_collect_image_b64", lambda *a, **kw: None) result = provider.generate("a cat") assert result["success"] is False assert result["error_type"] == "empty_response" - def test_client_init_failure_returns_auth_error(self, provider, monkeypatch): - monkeypatch.setattr(codex_plugin, "_read_codex_access_token", lambda: "codex-token") - monkeypatch.setattr(codex_plugin, "_build_codex_client", lambda: None) - - result = provider.generate("a cat") - assert result["success"] is False - assert result["error_type"] == "auth_required" - def test_stream_exception_returns_api_error(self, provider, monkeypatch): monkeypatch.setattr(codex_plugin, "_read_codex_access_token", lambda: "codex-token") - def _boom(**kwargs): + def _boom(*args, **kwargs): raise RuntimeError("cloudflare 403") - fake_client = SimpleNamespace(responses=SimpleNamespace(stream=_boom)) - monkeypatch.setattr(codex_plugin, "_build_codex_client", lambda: fake_client) + monkeypatch.setattr(codex_plugin, "_collect_image_b64", _boom) result = provider.generate("a cat") assert result["success"] is False