feat(pets): generation RPCs, non-blocking gallery + gateway plumbing

- pet.generate / pet.hatch (parallel rows, off the reader thread) +
  cooperative pet.cancel; pet.export / pet.rename.
- pet.gallery localOnly fast path + background manifest prefetch so the
  picker never blocks on petdex; rename follows the active-pet config.
- gateway request gains optional timeout + AbortSignal for real Stop.
This commit is contained in:
Brooklyn Nicholson 2026-06-24 13:48:38 -05:00
parent 3faf768cde
commit aab49f6927
5 changed files with 632 additions and 31 deletions

View file

@ -94,7 +94,7 @@ export function useGatewayRequest() {
}, [])
const requestGateway = useCallback(
async <T>(method: string, params: Record<string, unknown> = {}) => {
async <T>(method: string, params: Record<string, unknown> = {}, timeoutMs?: number, signal?: AbortSignal) => {
const gateway = gatewayRef.current
if (!gateway) {
@ -102,7 +102,7 @@ export function useGatewayRequest() {
}
try {
return await gateway.request<T>(method, params)
return await gateway.request<T>(method, params, timeoutMs, signal)
} catch (error) {
const message = error instanceof Error ? error.message : String(error)
@ -128,7 +128,7 @@ export function useGatewayRequest() {
throw error
}
return recovered.request<T>(method, params)
return recovered.request<T>(method, params, timeoutMs, signal)
}
},
[ensureGatewayOpen]

View file

@ -217,29 +217,67 @@ export class JsonRpcGatewayClient {
return () => this.stateHandlers.delete(handler)
}
request<T>(method: string, params: Record<string, unknown> = {}, timeoutMs = this.options.requestTimeoutMs): Promise<T> {
request<T>(
method: string,
params: Record<string, unknown> = {},
timeoutMs = this.options.requestTimeoutMs,
signal?: AbortSignal
): Promise<T> {
const socket = this.socket
if (!socket || socket.readyState !== WebSocket.OPEN) {
return Promise.reject(new Error(this.options.notConnectedErrorMessage))
}
if (signal?.aborted) {
return Promise.reject(new DOMException('Aborted', 'AbortError'))
}
const id = this.options.createRequestId(++this.nextId)
return new Promise<T>((resolve, reject) => {
let onAbort: (() => void) | undefined
const detach = () => {
if (onAbort && signal) {
signal.removeEventListener('abort', onAbort)
}
}
const pending: PendingCall = {
reject,
resolve: value => resolve(value as T)
resolve: value => {
detach()
resolve(value as T)
},
reject: error => {
detach()
reject(error)
}
}
if (timeoutMs > 0) {
pending.timer = setTimeout(() => {
if (this.pending.delete(id)) {
detach()
reject(new Error(`request timed out: ${method}`))
}
}, timeoutMs)
}
// Abort drops the pending call immediately (no dangling resolver/timer);
// server-side cancellation is a separate cooperative RPC where it matters.
if (signal) {
onAbort = () => {
const call = this.pending.get(id)
if (call?.timer) {
clearTimeout(call.timer)
}
this.pending.delete(id)
detach()
reject(new DOMException('Aborted', 'AbortError'))
}
signal.addEventListener('abort', onAbort, { once: true })
}
this.pending.set(id, pending)
try {
@ -253,6 +291,7 @@ export class JsonRpcGatewayClient {
)
} catch (error) {
this.clearPending(id)
detach()
reject(error instanceof Error ? error : new Error(String(error)))
}
})

View file

@ -416,6 +416,26 @@ def _clear_active_if(slug: str) -> bool:
return True
def _rename_active_if(old_slug: str, new_slug: str) -> bool:
"""Repoint the active pet from ``old_slug`` to ``new_slug`` iff it's active.
Used when a rename realigns a pet's slug/dir: if the renamed pet was the
active one, the config must follow or surfaces point at a now-missing dir.
Preserves the ``enabled`` flag. Returns whether anything changed.
"""
if not new_slug or old_slug == new_slug:
return False
from hermes_cli.config import load_config, save_config
cfg = load_config()
pet = cfg.setdefault("display", {}).setdefault("pet", {})
if not isinstance(pet, dict) or str(pet.get("slug", "") or "") != old_slug:
return False
pet["slug"] = new_slug
save_config(cfg)
return True
def _interactive_pick(pets) -> str:
"""Minimal numbered picker (avoids curses dep for a tiny list)."""
_print("Installed pets:")

View file

@ -0,0 +1,180 @@
"""Gateway RPC tests for pet generation (pet.generate / pet.hatch).
Image generation is mocked, so these assert the RPC contract + staging behavior
(draft tokens, data-URI previews, expiry, activation) without any API calls.
"""
from __future__ import annotations
import pytest
pytest.importorskip("PIL")
from PIL import Image # noqa: E402
from tui_gateway import server # noqa: E402
def _png(path):
Image.new("RGBA", (64, 64), (200, 80, 80, 255)).save(path)
def test_pet_generate_requires_prompt():
resp = server._methods["pet.generate"]("r1", {"prompt": " "})
assert "error" in resp
def test_pet_generate_returns_token_and_previews(monkeypatch, tmp_path):
import agent.pet.generate as gen
def fake_drafts(prompt, *, n=4, style="auto", on_draft=None, is_cancelled=None):
paths = []
for i in range(n):
p = tmp_path / f"d{i}.png"
_png(p)
paths.append(p)
if on_draft is not None:
on_draft(i, p)
return paths
monkeypatch.setattr(gen, "generate_base_drafts", fake_drafts)
resp = server._methods["pet.generate"]("r2", {"prompt": "a robot fox", "count": 4})
result = resp["result"]
assert result["ok"]
assert len(result["drafts"]) == 4
assert all(d["dataUri"].startswith("data:image/png;base64,") for d in result["drafts"])
# Drafts are staged on disk under the returned token.
staged = server._pet_gen_root() / result["token"] / "draft-0.png"
assert staged.is_file()
def test_pet_cancel_unknown_token_is_noop():
resp = server._methods["pet.cancel"]("c0", {"token": "missing"})
assert resp["result"]["ok"] is True
def test_pet_generate_cancel_stops_run(monkeypatch, tmp_path):
import agent.pet.generate as gen
seen: dict = {}
def cap_emit(event, sid, payload=None):
# Capture the token from the up-front init event so we can cancel it.
if event == "pet.generate.progress" and payload and payload.get("token") and not payload.get("dataUri"):
seen["token"] = payload["token"]
monkeypatch.setattr(server, "_emit", cap_emit)
def fake_drafts(prompt, *, n=4, style="auto", on_draft=None, is_cancelled=None):
# Simulate a Stop landing mid-run: the cooperative flag must read True.
server._pet_cancel_request(seen["token"])
assert is_cancelled() is True
return [] # bailed before producing anything
monkeypatch.setattr(gen, "generate_base_drafts", fake_drafts)
resp = server._methods["pet.generate"]("rc", {"prompt": "x", "count": 4})
assert "error" in resp
assert "cancel" in resp["error"]["message"].lower()
# The flag is released after the run so reusing the token isn't pre-cancelled.
assert server._pet_is_cancelled(seen["token"]) is False
def test_pet_hatch_validates_params():
assert "error" in server._methods["pet.hatch"]("r1", {"name": "x"}) # missing token
assert "error" in server._methods["pet.hatch"]("r2", {"token": "abc"}) # missing name
def test_pet_hatch_expired_draft():
resp = server._methods["pet.hatch"]("r3", {"token": "nope", "index": 0, "name": "Ghost"})
assert "error" in resp
assert "expired" in resp["error"]["message"]
def _fake_drafts_factory(tmp_path):
def fake_drafts(prompt, *, n=4, style="auto", on_draft=None, is_cancelled=None):
paths = []
for i in range(n):
p = tmp_path / f"d{i}.png"
_png(p)
paths.append(p)
if on_draft is not None:
on_draft(i, p)
return paths
return fake_drafts
def _fake_hatch_factory(captured):
"""A hatch that registers a real local pet (so the preview payload populates)."""
import agent.pet.generate as gen
from agent.pet import store
def fake_hatch(*, base_image, slug, display_name="", description="", concept="", style="auto", on_progress=None, provider=None, is_cancelled=None):
captured["base_image"] = str(base_image)
captured["slug"] = slug
pet = store.register_local_pet(
Image.new("RGBA", (192, 208), (10, 20, 30, 255)),
slug=slug,
display_name=display_name,
description=description,
)
return gen.HatchResult(
slug=pet.slug,
display_name=display_name or pet.display_name,
spritesheet=pet.spritesheet,
states=["idle", "wave"],
validation={"ok": True, "warnings": ["state 'jump' has no frames"]},
)
return fake_hatch
def test_pet_generate_then_hatch_previews_without_activating(monkeypatch, tmp_path):
import agent.pet.generate as gen
from agent.pet import store
captured = {}
monkeypatch.setattr(gen, "generate_base_drafts", _fake_drafts_factory(tmp_path))
monkeypatch.setattr(gen, "hatch_pet", _fake_hatch_factory(captured))
token = server._methods["pet.generate"]("r1", {"prompt": "a fox"})["result"]["token"]
resp = server._methods["pet.hatch"](
"r2",
{"token": token, "index": 1, "name": "My Fox", "description": "vulpine"},
)
result = resp["result"]
assert result["ok"]
assert result["slug"] == "my-fox"
assert result["displayName"] == "My Fox"
assert result["warnings"] == ["state 'jump' has no frames"]
# Hatched from the chosen draft index.
assert captured["base_image"].endswith("draft-1.png")
# The pet is installed on disk and the preview payload carries the sheet,
# but hatch must NOT activate it — adoption is a separate step.
assert store.load_pet("my-fox") is not None
assert result["pet"]["slug"] == "my-fox"
assert result["pet"]["spritesheetBase64"]
assert server._methods["pet.info"]("r3", {}).get("result", {}).get("enabled") in (False, None)
def test_pet_hatch_then_adopt_activates(monkeypatch, tmp_path):
import agent.pet.generate as gen
captured = {}
monkeypatch.setattr(gen, "generate_base_drafts", _fake_drafts_factory(tmp_path))
monkeypatch.setattr(gen, "hatch_pet", _fake_hatch_factory(captured))
activated = {}
monkeypatch.setattr("hermes_cli.pets._set_active", lambda slug: activated.setdefault("slug", slug))
token = server._methods["pet.generate"]("r1", {"prompt": "a fox"})["result"]["token"]
hatched = server._methods["pet.hatch"]("r2", {"token": token, "index": 0, "name": "My Fox"})["result"]
# Adoption is the existing pet.select path, against the now-installed slug.
adopt = server._methods["pet.select"]("r3", {"slug": hatched["slug"]})["result"]
assert adopt["ok"]
assert activated["slug"] == "my-fox"

View file

@ -184,6 +184,10 @@ _LONG_HANDLERS = frozenset(
# animation poll stutters. On the pool they run concurrently.
"pet.cells",
"pet.gallery",
# Generation is the heaviest pet path by far — multiple image-model
# round-trips per call — so it must never block the reader thread.
"pet.generate",
"pet.hatch",
"pet.select",
"pet.thumb",
"plugins.manage",
@ -5575,6 +5579,49 @@ def _pet_frame_counts(spritesheet) -> dict:
return {}
def _pet_config_scale() -> float:
"""Configured ``display.pet.scale`` (or the engine default), never raises."""
from agent.pet import constants
try:
from hermes_cli.config import load_config
cfg = load_config()
display = cfg.get("display", {}) if isinstance(cfg.get("display"), dict) else {}
pet_cfg = display.get("pet", {}) if isinstance(display.get("pet"), dict) else {}
return float(pet_cfg.get("scale", constants.DEFAULT_SCALE) or constants.DEFAULT_SCALE)
except Exception: # noqa: BLE001
return constants.DEFAULT_SCALE
def _pet_sprite_payload(pet, *, scale: float) -> dict:
"""Build the renderer payload (spritesheet bytes + geometry) for *pet*.
Shared by ``pet.info`` (the active mascot) and ``pet.hatch`` (the unadopted
preview) so both feed the desktop canvas / TUI from one shape.
"""
import base64
from agent.pet import constants
raw = pet.spritesheet.read_bytes()
suffix = pet.spritesheet.suffix.lower()
mime = "image/png" if suffix == ".png" else "image/webp"
return {
"slug": pet.slug,
"displayName": pet.display_name,
"mime": mime,
"spritesheetBase64": base64.standard_b64encode(raw).decode("ascii"),
"frameW": constants.FRAME_W,
"frameH": constants.FRAME_H,
"framesPerState": constants.FRAMES_PER_STATE,
"framesByState": _pet_frame_counts(pet.spritesheet),
"loopMs": constants.LOOP_MS,
"scale": scale,
"stateRows": _pet_state_rows(pet.spritesheet),
}
def _pet_state_rows(spritesheet) -> list[str]:
"""Row taxonomy for the concrete active pet sheet.
@ -5610,8 +5657,6 @@ def _(rid, params: dict) -> dict:
before the agent finishes building. Fail-open: returns ``enabled=False``
on any error rather than erroring the surface.
"""
import base64
try:
from agent.pet import constants, store
@ -5631,26 +5676,8 @@ def _(rid, params: dict) -> dict:
if not enabled or pet is None or not pet.exists:
return _ok(rid, {"enabled": False})
raw = pet.spritesheet.read_bytes()
suffix = pet.spritesheet.suffix.lower()
mime = "image/png" if suffix == ".png" else "image/webp"
return _ok(
rid,
{
"enabled": True,
"slug": pet.slug,
"displayName": pet.display_name,
"mime": mime,
"spritesheetBase64": base64.standard_b64encode(raw).decode("ascii"),
"frameW": constants.FRAME_W,
"frameH": constants.FRAME_H,
"framesPerState": constants.FRAMES_PER_STATE,
"framesByState": _pet_frame_counts(pet.spritesheet),
"loopMs": constants.LOOP_MS,
"scale": float(pet_cfg.get("scale", constants.DEFAULT_SCALE) or constants.DEFAULT_SCALE),
"stateRows": _pet_state_rows(pet.spritesheet),
},
)
scale = float(pet_cfg.get("scale", constants.DEFAULT_SCALE) or constants.DEFAULT_SCALE)
return _ok(rid, {"enabled": True, **_pet_sprite_payload(pet, scale=scale)})
except Exception as exc: # noqa: BLE001 - cosmetic, never break the surface
logger.debug("pet.info failed: %s", exc)
return _ok(rid, {"enabled": False})
@ -5770,7 +5797,12 @@ def _(rid, params: dict) -> dict:
current config (active slug + enabled). Agent-independent. Fail-open:
returns whatever is installed locally if the gallery can't be reached, so
the picker still works offline.
Param ``localOnly`` (bool): skip the remote petdex manifest fetch and return
only locally-installed pets. The desktop loads this first so the user's own
pets render instantly instead of waiting on the (possibly slow) manifest.
"""
local_only = bool(params.get("localOnly"))
try:
from agent.pet import store
@ -5788,9 +5820,14 @@ def _(rid, params: dict) -> dict:
gallery: list[dict] = []
seen: set[str] = set()
try:
from agent.pet.manifest import fetch_manifest
from agent.pet.manifest import fetch_manifest, prefetch
for entry in fetch_manifest():
# Local-only: skip the network entirely, but kick off a background
# warm so the follow-up full request usually hits a cached manifest.
if local_only:
prefetch()
for entry in [] if local_only else fetch_manifest():
seen.add(entry.slug)
gallery.append(
{
@ -5802,6 +5839,7 @@ def _(rid, params: dict) -> dict:
# hand-picked/official set, identified by the asset path)
# is the closest signal, so the picker can surface it first.
"curated": "/curated/" in entry.spritesheet_url,
"generated": entry.slug in installed and installed[entry.slug].generated,
}
)
except Exception as exc: # noqa: BLE001 - offline: fall back to installed
@ -5811,7 +5849,13 @@ def _(rid, params: dict) -> dict:
for slug, pet in installed.items():
if slug not in seen:
gallery.append(
{"slug": slug, "displayName": pet.display_name, "installed": True, "spritesheetUrl": ""}
{
"slug": slug,
"displayName": pet.display_name,
"installed": True,
"spritesheetUrl": "",
"generated": pet.generated,
}
)
return _ok(
@ -5884,6 +5928,71 @@ def _(rid, params: dict) -> dict:
return _err(rid, 5031, f"pet.remove failed: {exc}")
@method("pet.export")
@_profile_scoped
def _(rid, params: dict) -> dict:
"""Export an installed pet as a re-importable ``.zip`` (pet.json + sprite).
Params: ``slug`` (required). Returns ``{ok, filename, zipBase64}`` the
client decodes the base64 and saves it. Heavy-ish (reads + zips files) but
small; runs inline.
"""
slug = str(params.get("slug") or "").strip()
if not slug:
return _err(rid, 4004, "missing slug")
try:
import base64
from agent.pet import store
filename, data = store.export_pet(slug)
return _ok(
rid,
{"ok": True, "filename": filename, "zipBase64": base64.standard_b64encode(data).decode("ascii")},
)
except Exception as exc: # noqa: BLE001
logger.debug("pet.export failed: %s", exc)
return _err(rid, 5031, f"pet.export failed: {exc}")
@method("pet.rename")
@_profile_scoped
def _(rid, params: dict) -> dict:
"""Rename an installed pet's display name + realign its slug/dir.
Params: ``slug`` + ``name`` (both required). Lets the generate flow hatch
with a provisional name and apply the user's chosen name at adopt time.
Returns ``{ok, slug, displayName}`` with the (possibly new) slug.
"""
slug = str(params.get("slug") or "").strip()
name = str(params.get("name") or "").strip()
if not slug:
return _err(rid, 4004, "missing slug")
if not name:
return _err(rid, 4004, "missing name")
try:
from agent.pet import store
new_slug = store.rename_pet(slug, name)
if not new_slug:
return _err(rid, 5031, "pet.rename failed")
# The dir may have moved; if the renamed pet was active, follow the slug
# in config so surfaces don't point at the old (now-missing) directory.
if new_slug != slug:
try:
from hermes_cli.pets import _rename_active_if
_rename_active_if(slug, new_slug)
except Exception as exc: # noqa: BLE001 - rename already succeeded
logger.debug("pet.rename config update failed: %s", exc)
return _ok(rid, {"ok": True, "slug": new_slug, "displayName": name})
except Exception as exc: # noqa: BLE001
logger.debug("pet.rename failed: %s", exc)
return _err(rid, 5031, f"pet.rename failed: {exc}")
@method("pet.thumb")
@_profile_scoped
def _(rid, params: dict) -> dict:
@ -5954,6 +6063,259 @@ def _(rid, params: dict) -> dict:
return _err(rid, 5031, f"pet.scale failed: {exc}")
def _pet_gen_root():
"""Profile-scoped staging dir for in-progress generation drafts."""
from hermes_constants import get_hermes_home
root = get_hermes_home() / "cache" / "pet-gen"
root.mkdir(parents=True, exist_ok=True)
return root
def _pet_gen_sweep(root, *, max_age_s: float = 3600.0) -> None:
"""Drop stale draft staging dirs so cache never grows unbounded."""
import shutil
import time
try:
now = time.time()
for child in root.iterdir():
if child.is_dir() and now - child.stat().st_mtime > max_age_s:
shutil.rmtree(child, ignore_errors=True)
except Exception as exc: # noqa: BLE001 - cleanup is best-effort
logger.debug("pet-gen sweep failed: %s", exc)
def _pet_png_data_uri(path, *, max_px: int = 160) -> str:
"""Downscaled PNG data URI for a draft image (small preview payload)."""
import base64
import io
from PIL import Image
with Image.open(path) as opened:
img = opened.convert("RGBA")
img.thumbnail((max_px, max_px), Image.LANCZOS)
buf = io.BytesIO()
img.save(buf, format="PNG")
return "data:image/png;base64," + base64.standard_b64encode(buf.getvalue()).decode("ascii")
# Cooperative cancellation for the heavy pet generation paths. The client's Stop
# aborts its RPC immediately, but the worker-pool generation keeps running unless
# told to stop — pet.cancel flips a token's flag, which generate_base_drafts /
# hatch_pet poll between provider calls to skip work they haven't started.
_pet_cancel_lock = threading.Lock()
_pet_cancelled: set[str] = set()
def _pet_cancel_arm(token: str) -> None:
"""Clear a stale cancel flag at the start of a generate/hatch run."""
with _pet_cancel_lock:
_pet_cancelled.discard(token)
def _pet_cancel_request(token: str) -> None:
with _pet_cancel_lock:
_pet_cancelled.add(token)
def _pet_is_cancelled(token: str) -> bool:
with _pet_cancel_lock:
return token in _pet_cancelled
def _pet_cancel_release(token: str) -> None:
with _pet_cancel_lock:
_pet_cancelled.discard(token)
@method("pet.cancel")
def _(rid, params: dict) -> dict:
"""Signal an in-flight ``pet.generate``/``pet.hatch`` (by token) to stop.
Best-effort + idempotent: cancelling an unknown/finished token is a no-op.
Stays off the worker pool so it lands while a heavy generation is occupying
it. Returns ``{ok: True}``.
"""
token = str(params.get("token") or "").strip()
if token:
_pet_cancel_request(token)
return _ok(rid, {"ok": True})
@method("pet.generate")
def _(rid, params: dict) -> dict:
"""Generate candidate base looks for a new pet (the draft/variant step).
Params: ``prompt`` (required), ``count`` (default 4), ``style`` (default
``auto``). Returns ``{ok, token, drafts:[{index, dataUri}]}`` the token
keys the staged base images for a later ``pet.hatch``. Retry == call again
(fresh token). Heavy (network): runs on the worker pool.
"""
prompt = str(params.get("prompt") or "").strip()
if not prompt:
return _err(rid, 4004, "missing prompt")
try:
count = max(1, min(4, int(params.get("count") or 4)))
except (TypeError, ValueError):
count = 4
style = str(params.get("style") or "auto").strip() or "auto"
try:
import shutil
import uuid
from agent.pet.generate import generate_base_drafts
from agent.pet.generate.imagegen import GenerationError
root = _pet_gen_root()
_pet_gen_sweep(root)
# Token up front so each draft can be staged + streamed the moment it
# lands, instead of the user staring at a blank grid until all N finish.
token = uuid.uuid4().hex[:12]
_pet_cancel_arm(token)
stage = root / token
stage.mkdir(parents=True, exist_ok=True)
out: list[dict] = []
# Hand the token to the client up front (token-only init event) so a Stop
# fired before the first draft lands can still target this run.
try:
_emit("pet.generate.progress", "", {"token": token, "count": count})
except Exception as exc: # noqa: BLE001 - streaming is best-effort
logger.debug("pet.generate init emit failed: %s", exc)
def _on_draft(index: int, src) -> None:
dest = stage / f"draft-{index}.png"
try:
shutil.copyfile(src, dest)
data_uri = _pet_png_data_uri(dest)
except Exception as exc: # noqa: BLE001 - skip a bad draft, keep the rest
logger.debug("pet.generate draft %d failed: %s", index, exc)
return
out.append({"index": index, "dataUri": data_uri})
# Stream this draft to the client so the grid fills in live. Best-
# effort: a transport hiccup must not abort the generation itself.
try:
_emit(
"pet.generate.progress",
"",
{"token": token, "index": index, "dataUri": data_uri, "count": count},
)
except Exception as exc: # noqa: BLE001
logger.debug("pet.generate progress emit failed: %s", exc)
try:
generate_base_drafts(
prompt,
n=count,
style=style,
on_draft=_on_draft,
is_cancelled=lambda: _pet_is_cancelled(token),
)
except GenerationError as exc:
_pet_cancel_release(token)
return _err(rid, 5031, str(exc))
cancelled = _pet_is_cancelled(token)
_pet_cancel_release(token)
if cancelled:
return _err(rid, 5031, "generation cancelled")
if not out:
return _err(rid, 5031, "generation produced no usable drafts")
out.sort(key=lambda d: d["index"])
return _ok(rid, {"ok": True, "token": token, "drafts": out})
except Exception as exc: # noqa: BLE001
logger.debug("pet.generate failed: %s", exc)
return _err(rid, 5031, f"pet.generate failed: {exc}")
@method("pet.hatch")
def _(rid, params: dict) -> dict:
"""Turn a chosen base draft into a full pet — installed but NOT yet active.
Generation is expensive and the result varies, so hatch produces a *preview*
the surface plays (all frames) before the user commits: the pet is written to
the store (so it can be rendered + later activated) but the active pet is left
untouched. Adopt with ``pet.select`` or throw it away with ``pet.remove``.
Params: ``token`` + ``index`` (from ``pet.generate``), ``name`` (required),
``description`` (optional), ``prompt`` (optional concept for row prompts),
``style`` (optional). Returns ``{ok, slug, displayName, warnings, pet}`` where
``pet`` is the renderer payload. Heavy (network + raster): worker pool.
"""
token = str(params.get("token") or "").strip()
index = params.get("index", 0)
name = str(params.get("name") or "").strip()
if not token:
return _err(rid, 4004, "missing token")
if not name:
return _err(rid, 4004, "missing name")
try:
index = int(index)
except (TypeError, ValueError):
index = 0
try:
from agent.pet import store
from agent.pet.generate import hatch_pet
from agent.pet.generate.imagegen import GenerationError
base = _pet_gen_root() / token / f"draft-{index}.png"
if not base.is_file():
return _err(rid, 4004, "draft expired — generate again")
_pet_cancel_arm(token)
slug = store.unique_slug(name)
def _on_progress(event: str, detail: str) -> None:
# Row progress is encoded as "<state>:<done>:<total>" so the egg
# screen can show "Drawing <state>… (n/total)"; other phases
# (compose, save) pass through as-is. Best-effort streaming.
payload: dict = {"event": event, "detail": detail}
if event == "row" and detail.count(":") == 2:
state, done, total = detail.split(":")
payload = {"event": "row", "state": state, "done": done, "total": total}
try:
_emit("pet.hatch.progress", "", payload)
except Exception as exc: # noqa: BLE001
logger.debug("pet.hatch progress emit failed: %s", exc)
try:
result = hatch_pet(
base_image=base,
slug=slug,
display_name=name,
description=str(params.get("description") or ""),
concept=str(params.get("prompt") or name),
style=str(params.get("style") or "auto").strip() or "auto",
on_progress=_on_progress,
is_cancelled=lambda: _pet_is_cancelled(token),
)
except GenerationError as exc:
return _err(rid, 5031, str(exc))
finally:
_pet_cancel_release(token)
pet = store.load_pet(result.slug)
payload = _pet_sprite_payload(pet, scale=_pet_config_scale()) if pet else {}
return _ok(
rid,
{
"ok": True,
"slug": result.slug,
"displayName": result.display_name,
"warnings": result.validation.get("warnings", []),
"pet": payload,
},
)
except Exception as exc: # noqa: BLE001
logger.debug("pet.hatch failed: %s", exc)
return _err(rid, 5031, f"pet.hatch failed: {exc}")
@method("credits.view")
def _(rid, params: dict) -> dict:
"""Structured Nous credit view for the TUI /credits command.