From e7dbfdaad7b1bb45cd9eb9c09a03c0213c65320d Mon Sep 17 00:00:00 2001 From: Brooklyn Nicholson Date: Sat, 20 Jun 2026 14:18:30 -0500 Subject: [PATCH] feat(pets): pet engine + display.pet config Add the shared pet engine under agent/pet/: spritesheet manifest loading and in-process caching, six-state animation model, frame rendering, and the persistent pet store. Register the display.pet config block (pet, scale, enabled, etc.) that every surface reads from. Covered by tests/agent/test_pet_engine.py. --- agent/pet/__init__.py | 51 +++ agent/pet/constants.py | 167 +++++++++ agent/pet/manifest.py | 128 +++++++ agent/pet/render.py | 618 +++++++++++++++++++++++++++++++++ agent/pet/state.py | 81 +++++ agent/pet/store.py | 316 +++++++++++++++++ hermes_cli/config.py | 25 ++ tests/agent/test_pet_engine.py | 371 ++++++++++++++++++++ 8 files changed, 1757 insertions(+) create mode 100644 agent/pet/__init__.py create mode 100644 agent/pet/constants.py create mode 100644 agent/pet/manifest.py create mode 100644 agent/pet/render.py create mode 100644 agent/pet/state.py create mode 100644 agent/pet/store.py create mode 100644 tests/agent/test_pet_engine.py diff --git a/agent/pet/__init__.py b/agent/pet/__init__.py new file mode 100644 index 00000000000..b045598d2eb --- /dev/null +++ b/agent/pet/__init__.py @@ -0,0 +1,51 @@ +"""Petdex pet engine — shared core for the CLI, TUI, and desktop surfaces. + +Petdex (https://github.com/crafter-station/petdex) is a public gallery of +animated sprite "pets" for coding agents. Each pet is a ``pet.json`` plus a +``spritesheet.{webp,png}`` of 192×208 px cells. Current Codex/petdex sheets use +an 8-column × 9-row atlas; older Hermes/petdex sheets used an 8-row atlas. +Hermes infers the row taxonomy from the sheet and maps agent activity onto +idle/run/review/failed/wave/jump. + +This package is the **single source of truth** for the feature so the base +CLI (Python) and TUI (Ink, via ``tui_gateway``) never duplicate the hard +parts: + +- :mod:`agent.pet.constants` — frame geometry + the :class:`PetState` enum. +- :mod:`agent.pet.state` — map agent activity → a :class:`PetState`. +- :mod:`agent.pet.manifest` — fetch the public petdex manifest. +- :mod:`agent.pet.store` — install / list / resolve pets on disk + (profile-aware via ``get_hermes_home()``). +- :mod:`agent.pet.render` — decode a spritesheet and encode frames for a + terminal (kitty / iTerm2 / sixel graphics + protocols, with a Unicode half-block + fallback). + +Rendering in the Electron desktop is necessarily TypeScript (canvas), but it +reuses the same on-disk store and the same state semantics. + +The whole feature is a *display* concern: it adds no model tool, mutates no +system prompt or toolset, and therefore has zero effect on prompt caching. +""" + +from agent.pet.constants import ( + DEFAULT_SCALE, + FRAME_H, + FRAME_W, + FRAMES_PER_STATE, + LOOP_MS, + STATE_ROWS, + PetState, +) +from agent.pet.state import derive_pet_state + +__all__ = [ + "DEFAULT_SCALE", + "FRAME_H", + "FRAME_W", + "FRAMES_PER_STATE", + "LOOP_MS", + "STATE_ROWS", + "PetState", + "derive_pet_state", +] diff --git a/agent/pet/constants.py b/agent/pet/constants.py new file mode 100644 index 00000000000..a7e816c4012 --- /dev/null +++ b/agent/pet/constants.py @@ -0,0 +1,167 @@ +"""Pet sprite geometry + animation-state taxonomy. + +These values are the common petdex/Codex pet geometry. The real ``pet.json`` +usually only carries ``id``/``displayName``/``description``/``spritesheetPath``; +row taxonomy is inferred from the atlas shape so Hermes can render both legacy +8-row sheets and current 9-row Codex sheets. +""" + +from __future__ import annotations + +from enum import Enum + +# Frame geometry (pixels). Current Codex/petdex spritesheets are 8 columns x 9 +# rows (1536x1872), while older Hermes/petdex sheets used 9 columns x 8 rows +# (1728x1664). Renderers derive both row taxonomy and real column count from the +# concrete sheet, so either shape works. +FRAME_W = 192 +FRAME_H = 208 + +# Frames consumed per animation state (the petdex web app uses CSS +# ``steps(6)``). A sheet may physically contain more columns; we only step +# through the first ``FRAMES_PER_STATE``. +FRAMES_PER_STATE = 6 + +# Full-loop duration for one state, milliseconds (petdex default). +LOOP_MS = 1100 + +# Default on-screen scale relative to native frame size. ``display.pet.scale`` +# is the single master scalar: the desktop canvas multiplies its native pixels +# by it and every terminal surface derives its half-block/kitty column width +# from it (see :func:`cols_for_scale`), so one number shrinks all three +# interfaces together. (petdex's own clients render at 0.7; we default smaller +# so the kitty/GUI mascot stays a glanceable corner sprite. The half-block +# fallback can't shrink as far — see ``UNICODE_MIN_COLS`` — and clamps to its +# legibility floor instead.) +DEFAULT_SCALE = 0.33 + +# User-settable scale bounds (``/pet scale``, desktop slider). Floor keeps the +# pet clickable/visible; ceiling stops a fat-fingered value from filling the +# screen. The unicode fallback additionally clamps to ``UNICODE_MIN_COLS``. +MIN_SCALE = 0.1 +MAX_SCALE = 3.0 + + +def clamp_scale(scale: float) -> float: + """Clamp *scale* to ``[MIN_SCALE, MAX_SCALE]`` (the single validation point).""" + return max(MIN_SCALE, min(MAX_SCALE, scale)) + +# Terminal cells one native frame spans at ``scale == 1.0``. A cell is ~8px +# wide, a frame is ``FRAME_W`` (192) px → 24 cells. This mirrors the kitty +# graphics placement (``scaled_px // 8``) so at full scale every renderer agrees. +BASE_UNICODE_COLS = FRAME_W // 8 + +# Legibility floor for the half-block fallback. A half-block cell samples the +# sprite at only 1 horizontal + 2 vertical taps, so below this width a 192×208 +# pet collapses into an unreadable blob *regardless* of scale. kitty/GUI draw +# true pixels and have no such floor — that's why the same ``scale: 0.33`` is +# crisp there but mush in half-blocks. ``scale`` shrinks the unicode pet down +# TO this floor (and grows it above), instead of past it into noise. +UNICODE_MIN_COLS = 16 + + +def cols_for_scale(scale: float) -> int: + """Half-block width implied by *scale*, clamped to the legibility floor. + + Above the floor it tracks the kitty cell box (``scaled_px // 8``) so the two + renderers converge at larger sizes; below it the floor keeps the sprite + readable rather than letting it devolve into a blob. + """ + return max(UNICODE_MIN_COLS, round(BASE_UNICODE_COLS * (scale or DEFAULT_SCALE))) + + +def resolve_cols(scale: float, unicode_cols: int = 0) -> int: + """Resolve terminal width: explicit *unicode_cols* override, else from *scale*.""" + return int(unicode_cols) if unicode_cols and int(unicode_cols) > 0 else cols_for_scale(scale) + + +class PetState(str, Enum): + """Animation state a pet can be shown in. + + These are Hermes' activity state names. They are not always identical to the + source atlas row names: Codex-format pets use rows like ``jumping`` / + ``running`` while the UI keeps the shorter ``jump`` / ``run`` names. + """ + + IDLE = "idle" + WAVE = "wave" + RUN = "run" + FAILED = "failed" + REVIEW = "review" + JUMP = "jump" + WAITING = "waiting" + + +# Legacy Hermes/petdex row order (top -> bottom) used by the older 8-row, +# 9-column atlas shape. +LEGACY_STATE_ROWS: list[str] = [ + PetState.IDLE.value, + PetState.WAVE.value, + PetState.RUN.value, + PetState.FAILED.value, + PetState.REVIEW.value, + PetState.JUMP.value, + "extra1", + "extra2", +] + +# Current Petdex row order (top -> bottom) used by 1536x1872 atlases: +# 8 columns x 9 rows of 192x208 cells. +CODEX_STATE_ROWS: list[str] = [ + PetState.IDLE.value, + "running-right", + "running-left", + "waving", + "jumping", + PetState.FAILED.value, + PetState.WAITING.value, + "running", + PetState.REVIEW.value, +] + +# Default/fallback for callers without a sheet. Prefer the current 9-row Codex +# format because generated pets and the public Codex pet contract use it. +STATE_ROWS: list[str] = CODEX_STATE_ROWS + +# Canonical Hermes activity names -> accepted row-name aliases in descending +# preference. This keeps our internal state names stable (`wave`/`jump`/`run`) +# while matching Petdex's current `waving`/`jumping`/`running` taxonomy. +STATE_ALIASES: dict[str, tuple[str, ...]] = { + PetState.IDLE.value: (PetState.IDLE.value,), + PetState.WAVE.value: (PetState.WAVE.value, "waving"), + PetState.JUMP.value: (PetState.JUMP.value, "jumping"), + PetState.RUN.value: (PetState.RUN.value, "running"), + PetState.FAILED.value: (PetState.FAILED.value,), + PetState.REVIEW.value: (PetState.REVIEW.value,), + PetState.WAITING.value: (PetState.WAITING.value,), +} + + +def state_aliases_for(state: "PetState | str") -> tuple[str, ...]: + """Return accepted row-name aliases for *state* (always non-empty).""" + value = state.value if isinstance(state, PetState) else str(state) + aliases = STATE_ALIASES.get(value) + return aliases if aliases else (value,) + + +def state_rows_for_grid(row_count: int | None) -> list[str]: + """Return the row taxonomy for a spritesheet with *row_count* rows.""" + try: + rows = int(row_count or 0) + except (TypeError, ValueError): + rows = 0 + + if rows >= len(CODEX_STATE_ROWS): + return CODEX_STATE_ROWS + return LEGACY_STATE_ROWS + + +def state_row_index(state: "PetState | str", row_count: int | None = None) -> int: + """Return the spritesheet row index for *state* (clamped, never raises).""" + rows = state_rows_for_grid(row_count) + for name in state_aliases_for(state): + try: + return rows.index(name) + except ValueError: + continue + return 0 # fall back to the idle row diff --git a/agent/pet/manifest.py b/agent/pet/manifest.py new file mode 100644 index 00000000000..94d539691dc --- /dev/null +++ b/agent/pet/manifest.py @@ -0,0 +1,128 @@ +"""Fetch the public petdex manifest. + +``https://petdex.dev/api/manifest`` 307-redirects to a JSON document on R2: + + { + "generatedAt": "...", + "total": 2926, + "pets": [ + {"slug": "boba", "displayName": "Boba", "kind": "creature", + "submittedBy": "railly", + "spritesheetUrl": "https://assets.petdex.dev/.../spritesheet.webp", + "petJsonUrl": "https://assets.petdex.dev/.../pet.json", + "zipUrl": "https://assets.petdex.dev/.../boba.zip"}, + ... + ] + } + +Read-only and unauthenticated; no credentials involved. +""" + +from __future__ import annotations + +import logging +import time +from dataclasses import dataclass + +logger = logging.getLogger(__name__) + +MANIFEST_URL = "https://petdex.dev/api/manifest" + +_DEFAULT_TIMEOUT = 20.0 + +# In-process cache for the (large, slow, identical-per-call) manifest. The list +# is a static CDN object that barely changes, yet a single session can ask for +# it many times — every gallery open, plus a full re-fetch per install/select +# (``find_entry``). A short TTL collapses those into one network hit without +# going stale for long. Cleared by :func:`clear_cache` (tests). +_MANIFEST_TTL = 300.0 +_cache: tuple[float, list[ManifestEntry]] | None = None + + +def clear_cache() -> None: + """Drop the cached manifest (forces the next fetch to hit the network).""" + global _cache + _cache = None + + +@dataclass(frozen=True) +class ManifestEntry: + """A single pet's row in the manifest.""" + + slug: str + display_name: str + kind: str + submitted_by: str + spritesheet_url: str + pet_json_url: str + zip_url: str + + @classmethod + def from_dict(cls, data: dict) -> "ManifestEntry": + return cls( + slug=str(data.get("slug", "")).strip(), + display_name=str(data.get("displayName", "") or data.get("slug", "")), + kind=str(data.get("kind", "") or "pet"), + submitted_by=str(data.get("submittedBy", "") or ""), + spritesheet_url=str(data.get("spritesheetUrl", "") or ""), + pet_json_url=str(data.get("petJsonUrl", "") or ""), + zip_url=str(data.get("zipUrl", "") or ""), + ) + + +class ManifestError(RuntimeError): + """Raised when the manifest can't be fetched or parsed.""" + + +def fetch_manifest(*, timeout: float = _DEFAULT_TIMEOUT, force: bool = False) -> list[ManifestEntry]: + """Return every approved pet from the public manifest. + + Cached in-process for ``_MANIFEST_TTL`` seconds (pass ``force=True`` to + bypass). Follows the 307 redirect to R2. Raises :class:`ManifestError` on + any network/parse failure so callers can surface a clean message. + """ + global _cache + + if not force and _cache is not None and time.monotonic() - _cache[0] < _MANIFEST_TTL: + return _cache[1] + + try: + import httpx + except ImportError as exc: # pragma: no cover - httpx is a core dep + raise ManifestError("httpx is required to fetch the petdex manifest") from exc + + try: + resp = httpx.get( + MANIFEST_URL, + timeout=timeout, + follow_redirects=True, + headers={"User-Agent": "hermes-agent-petdex"}, + ) + resp.raise_for_status() + payload = resp.json() + except Exception as exc: # noqa: BLE001 - normalize to one error type + raise ManifestError(f"could not fetch petdex manifest: {exc}") from exc + + pets = payload.get("pets") if isinstance(payload, dict) else None + if not isinstance(pets, list): + raise ManifestError("petdex manifest had no 'pets' array") + + entries: list[ManifestEntry] = [] + for raw in pets: + if not isinstance(raw, dict): + continue + entry = ManifestEntry.from_dict(raw) + if entry.slug and entry.spritesheet_url: + entries.append(entry) + + _cache = (time.monotonic(), entries) + return entries + + +def find_entry(slug: str, *, timeout: float = _DEFAULT_TIMEOUT) -> ManifestEntry | None: + """Return the manifest entry for *slug*, or ``None`` if not listed.""" + slug = slug.strip().lower() + for entry in fetch_manifest(timeout=timeout): + if entry.slug.lower() == slug: + return entry + return None diff --git a/agent/pet/render.py b/agent/pet/render.py new file mode 100644 index 00000000000..1618c0751d2 --- /dev/null +++ b/agent/pet/render.py @@ -0,0 +1,618 @@ +"""Decode a pet spritesheet and encode frames for a terminal. + +Shared by the base CLI (writes the escape bytes to its own stdout) and the +TUI (``tui_gateway`` ships the encoded bytes to Ink, which writes them) so the +decode + capability-detection + protocol-encoding logic exists exactly once. + +Supported output modes, in fidelity order: + +- ``kitty`` — the kitty graphics protocol (kitty, Ghostty, WezTerm). +- ``iterm`` — iTerm2 inline images (iTerm2, WezTerm). +- ``sixel`` — DEC sixel (xterm -ti vt340, foot, mlterm, WezTerm, …). +- ``unicode`` — 24-bit half-block downscale; works in any truecolor terminal. + +Frame decoding requires Pillow (a core Hermes dependency). If Pillow or the +spritesheet is unavailable the renderer degrades to ``unicode`` text or an +empty string rather than raising. +""" + +from __future__ import annotations + +import base64 +import io +import logging +import os +import sys +from functools import lru_cache +from pathlib import Path + +from agent.pet.constants import ( + DEFAULT_SCALE, + FRAME_H, + FRAME_W, + FRAMES_PER_STATE, + PetState, + state_row_index, +) + +logger = logging.getLogger(__name__) + +# Public render-mode names accepted by ``display.pet.render_mode``. +RENDER_MODES = ("auto", "kitty", "iterm", "sixel", "unicode", "off") + + +# ───────────────────────────────────────────────────────────────────────── +# Terminal capability detection +# ───────────────────────────────────────────────────────────────────────── + +def detect_terminal_graphics() -> str: + """Best-effort detection of the richest graphics protocol available. + + Env-based (non-blocking — we never issue a DA1/terminal query that could + hang a pipe). Returns one of ``kitty`` / ``iterm`` / ``sixel`` / + ``unicode``. Conservative: unknown terminals get ``unicode``, which works + anywhere with truecolor. + """ + term = os.environ.get("TERM", "").lower() + term_program = os.environ.get("TERM_PROGRAM", "").lower() + + # The VS Code / Cursor integrated terminal sets TERM_PROGRAM=vscode + # authoritatively but does NOT scrub the terminal env vars it inherits when + # launched from another emulator (ITERM_SESSION_ID, KITTY_WINDOW_ID, …). + # Trusting those leaks emits an image protocol the embedded xterm.js can't + # display — you get a blank frame. Inline images there are opt-in + # (terminal.integrated.enableImages), so default to half-blocks, which + # always render in its truecolor grid. Users who enabled images can pin + # display.pet.render_mode explicitly. + if term_program == "vscode": + return "unicode" + + # kitty graphics protocol + if os.environ.get("KITTY_WINDOW_ID") or "kitty" in term or "ghostty" in term: + return "kitty" + if term_program in {"ghostty"}: + return "kitty" + + # WezTerm speaks both kitty and iterm; prefer kitty (richer placement). + if term_program == "wezterm" or os.environ.get("WEZTERM_PANE"): + return "kitty" + + # iTerm2 inline images + if term_program == "iterm.app" or os.environ.get("ITERM_SESSION_ID"): + return "iterm" + + # sixel-capable terminals (env heuristics only) + if term_program in {"mintty"} or "foot" in term or "mlterm" in term: + return "sixel" + if "sixel" in term: + return "sixel" + + return "unicode" + + +def resolve_mode(configured: str | None, *, stream=None) -> str: + """Resolve the effective render mode from config + the environment. + + ``configured`` is ``display.pet.render_mode`` (``auto`` → detect). Returns + ``off`` when not attached to a TTY (no point emitting graphics into a pipe + or logfile). + """ + mode = (configured or "auto").strip().lower() + if mode not in RENDER_MODES: + mode = "auto" + if mode == "off": + return "off" + + stream = stream or sys.stdout + try: + if not (hasattr(stream, "isatty") and stream.isatty()): + return "off" + except (ValueError, OSError): + return "off" + + if mode == "auto": + return detect_terminal_graphics() + return mode + + +# ───────────────────────────────────────────────────────────────────────── +# Frame decoding +# ───────────────────────────────────────────────────────────────────────── + +def _open_sheet(path: Path): + from PIL import Image + + img = Image.open(path) + return img.convert("RGBA") + + +# Max alpha at/below which a frame counts as blank padding. petdex sheets are +# left-packed: a state with fewer real frames than ``FRAMES_PER_STATE`` fills +# the trailing columns with fully transparent cells. Animating into one flashes +# the pet blank, so we stop the row at the first such gap. +_BLANK_ALPHA = 8 + + +def _frame_is_blank(frame) -> bool: + """True if *frame* has no meaningfully opaque pixel (transparent padding).""" + return frame.getchannel("A").getextrema()[1] <= _BLANK_ALPHA + + +@lru_cache(maxsize=16) +def _raw_frames( + sheet_path: str, + state_value: str, + frame_w: int, + frame_h: int, + frames_per_state: int, +) -> tuple: + """Cropped, padding-trimmed RGBA frames for one state row (unscaled). + + Steps across the row until the first blank column so pets with ragged + per-state frame counts never animate into empty padding. Cached; returns + ``()`` on any decode failure. + """ + try: + sheet = _open_sheet(Path(sheet_path)) + cols = max(1, sheet.width // frame_w) + rows = max(1, sheet.height // frame_h) + row = state_row_index(state_value, rows) + top = row * frame_h + # Clamp the row to the sheet (some pets ship fewer rows than the 8 the + # taxonomy reserves). + if top + frame_h > sheet.height: + top = max(0, sheet.height - frame_h) + + frames = [] + for i in range(min(frames_per_state, cols)): + left = i * frame_w + frame = sheet.crop((left, top, left + frame_w, top + frame_h)) + if _frame_is_blank(frame): + break # trailing transparent padding — real frames end here + frames.append(frame) + return tuple(frames) + except Exception as exc: # noqa: BLE001 - cosmetic feature, never fatal + logger.debug("pet frame decode failed (%s, %s): %s", sheet_path, state_value, exc) + return () + + +@lru_cache(maxsize=8) +def _frames_for( + sheet_path: str, + state_value: str, + frame_w: int, + frame_h: int, + frames_per_state: int, + scale_w: int, + scale_h: int, +): + """Return padding-trimmed RGBA frames for one state row, scaled. + + Thin scaling layer over :func:`_raw_frames`; both are cached so repeated + frame requests during animation are free. + """ + raw = _raw_frames(sheet_path, state_value, frame_w, frame_h, frames_per_state) + if not raw or (scale_w, scale_h) == (frame_w, frame_h): + return list(raw) + from PIL import Image + + return [f.resize((scale_w, scale_h), Image.LANCZOS) for f in raw] + + +def state_frame_counts( + sheet_path: str | Path, + *, + frame_w: int = FRAME_W, + frame_h: int = FRAME_H, + frames_per_state: int = FRAMES_PER_STATE, +) -> dict[str, int]: + """Map each driven :class:`PetState` → its real (padding-trimmed) frame count. + + The single source of truth for "how many frames does this state actually + have?". The CLI/TUI consume the trimmed frame lists directly; the gateway + ships this map to the desktop canvas, which steps its own loop. + """ + return { + state.value: len( + _raw_frames(str(sheet_path), state.value, frame_w, frame_h, frames_per_state) + ) + for state in PetState + } + + +# ───────────────────────────────────────────────────────────────────────── +# Encoders +# ───────────────────────────────────────────────────────────────────────── + +def _png_bytes(frame) -> bytes: + buf = io.BytesIO() + frame.save(buf, format="PNG") + return buf.getvalue() + + +def _kitty_apc(ctrl: str, data: str) -> str: + """Emit a kitty APC escape for *data*, chunked into ≤4096-byte ``m`` pieces.""" + chunk = 4096 + if len(data) <= chunk: + return f"\x1b_G{ctrl},m=0;{data}\x1b\\" + out = [f"\x1b_G{ctrl},m=1;{data[:chunk]}\x1b\\"] + rest = data[chunk:] + while rest: + piece, rest = rest[:chunk], rest[chunk:] + out.append(f"\x1b_Gm={1 if rest else 0};{piece}\x1b\\") + return "".join(out) + + +def _encode_kitty(frame, *, cell_cols: int | None = None, cell_rows: int | None = None) -> str: + """Encode one frame via the kitty graphics protocol (transmit + display). + + ``a=T`` transmits & displays at the cursor; ``c``/``r`` request a display + box in terminal cells so successive frames overwrite the same area. + """ + ctrl = "f=100,a=T,q=2" + if cell_cols: + ctrl += f",c={cell_cols}" + if cell_rows: + ctrl += f",r={cell_rows}" + return _kitty_apc(ctrl, base64.standard_b64encode(_png_bytes(frame)).decode("ascii")) + + +# ───────────────────────────────────────────────────────────────────────── +# kitty Unicode placeholders +# +# Ink (the TUI's React-for-terminal layer) owns the screen and measures every +# cell's width, so it can't host raw kitty image escapes (no width to count, +# clobbered on the next repaint). kitty's *Unicode placeholder* protocol is the +# grid-safe path: transmit the image once (q=2, virtual placement U=1), then the +# host app prints ordinary-width placeholder cells (U+10EEEE + diacritics) whose +# foreground color encodes the image id. Ink counts those as width-1 text, so +# layout stays correct and the terminal paints the image underneath. +# https://sw.kovidgoyal.net/kitty/graphics-protocol/#unicode-placeholders +# ───────────────────────────────────────────────────────────────────────── + +_KITTY_PLACEHOLDER = "\U0010eeee" + +# Row/column diacritics, in order (index → diacritic). Verbatim from kitty's +# gen/rowcolumn-diacritics.txt (Unicode 6.0.0, combining class 230). Index i is +# the diacritic that encodes the number i; we only ever need the row index. +_ROWCOL_DIACRITICS: tuple[int, ...] = ( + 0x0305, 0x030D, 0x030E, 0x0310, 0x0312, 0x033D, 0x033E, 0x033F, 0x0346, 0x034A, + 0x034B, 0x034C, 0x0350, 0x0351, 0x0352, 0x0357, 0x035B, 0x0363, 0x0364, 0x0365, + 0x0366, 0x0367, 0x0368, 0x0369, 0x036A, 0x036B, 0x036C, 0x036D, 0x036E, 0x036F, + 0x0483, 0x0484, 0x0485, 0x0486, 0x0487, 0x0592, 0x0593, 0x0594, 0x0595, 0x0597, + 0x0598, 0x0599, 0x059C, 0x059D, 0x059E, 0x059F, 0x05A0, 0x05A1, 0x05A8, 0x05A9, + 0x05AB, 0x05AC, 0x05AF, 0x05C4, 0x0610, 0x0611, 0x0612, 0x0613, 0x0614, 0x0615, + 0x0616, 0x0617, 0x0657, 0x0658, 0x0659, 0x065A, 0x065B, 0x065D, 0x065E, 0x06D6, + 0x06D7, 0x06D8, 0x06D9, 0x06DA, 0x06DB, 0x06DC, 0x06DF, 0x06E0, 0x06E1, 0x06E2, + 0x06E4, 0x06E7, 0x06E8, 0x06EB, 0x06EC, 0x0730, 0x0732, 0x0733, 0x0735, 0x0736, + 0x073A, 0x073D, 0x073F, 0x0740, 0x0741, 0x0743, 0x0745, 0x0747, 0x0749, 0x074A, + 0x07EB, 0x07EC, 0x07ED, 0x07EE, 0x07EF, 0x07F0, 0x07F1, 0x07F3, 0x0816, 0x0817, + 0x0818, 0x0819, 0x081B, 0x081C, 0x081D, 0x081E, 0x081F, 0x0820, 0x0821, 0x0822, + 0x0823, 0x0825, 0x0826, 0x0827, 0x0829, 0x082A, 0x082B, 0x082C, 0x082D, 0x0951, + 0x0953, 0x0954, 0x0F82, 0x0F83, 0x0F86, 0x0F87, 0x135D, 0x135E, 0x135F, 0x17DD, + 0x193A, 0x1A17, 0x1A75, 0x1A76, 0x1A77, 0x1A78, 0x1A79, 0x1A7A, 0x1A7B, 0x1A7C, + 0x1B6B, 0x1B6D, 0x1B6E, 0x1B6F, 0x1B70, 0x1B71, 0x1B72, 0x1B73, 0x1CD0, 0x1CD1, + 0x1CD2, 0x1CDA, 0x1CDB, 0x1CE0, 0x1DC0, 0x1DC1, 0x1DC3, 0x1DC4, 0x1DC5, 0x1DC6, + 0x1DC7, 0x1DC8, 0x1DC9, 0x1DCB, 0x1DCC, 0x1DD1, 0x1DD2, 0x1DD3, 0x1DD4, 0x1DD5, + 0x1DD6, 0x1DD7, 0x1DD8, 0x1DD9, 0x1DDA, 0x1DDB, 0x1DDC, 0x1DDD, 0x1DDE, 0x1DDF, + 0x1DE0, 0x1DE1, 0x1DE2, 0x1DE3, 0x1DE4, 0x1DE5, 0x1DE6, 0x1DFE, 0x20D0, 0x20D1, + 0x20D4, 0x20D5, 0x20D6, 0x20D7, 0x20DB, 0x20DC, 0x20E1, 0x20E7, 0x20E9, 0x20F0, + 0x2CEF, 0x2CF0, 0x2CF1, 0x2DE0, 0x2DE1, 0x2DE2, 0x2DE3, 0x2DE4, 0x2DE5, 0x2DE6, + 0x2DE7, 0x2DE8, 0x2DE9, 0x2DEA, 0x2DEB, 0x2DEC, 0x2DED, 0x2DEE, 0x2DEF, 0x2DF0, + 0x2DF1, 0x2DF2, 0x2DF3, 0x2DF4, 0x2DF5, 0x2DF6, 0x2DF7, 0x2DF8, 0x2DF9, 0x2DFA, + 0x2DFB, 0x2DFC, 0x2DFD, 0x2DFE, 0x2DFF, 0xA66F, 0xA67C, 0xA67D, 0xA6F0, 0xA6F1, + 0xA8E0, 0xA8E1, 0xA8E2, 0xA8E3, 0xA8E4, 0xA8E5, 0xA8E6, 0xA8E7, 0xA8E8, 0xA8E9, + 0xA8EA, 0xA8EB, 0xA8EC, 0xA8ED, 0xA8EE, 0xA8EF, 0xA8F0, 0xA8F1, 0xAAB0, 0xAAB2, + 0xAAB3, 0xAAB7, 0xAAB8, 0xAABE, 0xAABF, 0xAAC1, 0xFE20, 0xFE21, 0xFE22, 0xFE23, + 0xFE24, 0xFE25, 0xFE26, 0x10A0F, 0x10A38, 0x1D185, 0x1D186, 0x1D187, 0x1D188, + 0x1D189, 0x1D1AA, 0x1D1AB, 0x1D1AC, 0x1D1AD, 0x1D242, 0x1D243, 0x1D244, +) + + +def kitty_image_id(slug: str) -> int: + """Stable per-pet image id in ``[1, 0x7FFF]``. + + The id is encoded in the placeholder's 24-bit foreground color, so it must + be non-zero and fit comfortably under ``0xFFFFFF``. A small CRC keeps it + deterministic per slug (so re-renders reuse the same terminal-side image) + while making collisions between two different pets unlikely. + """ + import zlib + + return (zlib.crc32(slug.encode("utf-8")) % 0x7FFE) + 1 + + +def kitty_color_hex(image_id: int) -> str: + """Hex foreground color (``#rrggbb``) that encodes *image_id* for kitty.""" + return "#%06x" % (image_id & 0xFFFFFF) + + +def kitty_placeholder_rows(cols: int, rows: int) -> list[str]: + """Build the placeholder text grid for an *rows*×*cols* image. + + Each line is one row of the grid: the first cell carries the row diacritic + (column defaults to 0), and the remaining ``cols-1`` bare placeholders let + the terminal auto-increment the column. The foreground color (the image id) + is applied by the caller / Ink, not embedded here. + """ + cols = max(1, cols) + out: list[str] = [] + for r in range(max(1, rows)): + idx = min(r, len(_ROWCOL_DIACRITICS) - 1) + first = _KITTY_PLACEHOLDER + chr(_ROWCOL_DIACRITICS[idx]) + out.append(first + _KITTY_PLACEHOLDER * (cols - 1)) + return out + + +def _encode_kitty_virtual(frame, *, image_id: int, cols: int, rows: int) -> str: + """Transmit a frame as a kitty *virtual* placement for Unicode placeholders. + + ``a=T`` transmits and creates the placement in one shot; ``U=1`` marks it + virtual (no on-screen output, cursor untouched); ``q=2`` suppresses the + terminal's OK/error replies that would otherwise corrupt the host app's + output. Re-sending with the same ``i`` replaces the image, so the static + placeholder cells animate underneath. + """ + ctrl = f"a=T,U=1,i={image_id},c={cols},r={rows},f=100,q=2" + return _kitty_apc(ctrl, base64.standard_b64encode(_png_bytes(frame)).decode("ascii")) + + +def _encode_iterm(frame, *, cell_cols: int | None = None, cell_rows: int | None = None) -> str: + """Encode one frame as an iTerm2 inline image (OSC 1337 File).""" + payload = base64.standard_b64encode(_png_bytes(frame)).decode("ascii") + size = len(payload) + args = [f"inline=1", f"size={size}", "preserveAspectRatio=1"] + if cell_cols: + args.append(f"width={cell_cols}") + if cell_rows: + args.append(f"height={cell_rows}") + return f"\x1b]1337;File={';'.join(args)}:{payload}\x07" + + +def _encode_sixel(frame) -> str: + """Encode one frame as DEC sixel. + + Quantizes to an adaptive palette (≤255 colors) and emits the sixel band + stream. Pillow has no sixel writer, so this is a compact hand-rolled + encoder. Transparent pixels render as background (color register skipped). + """ + from PIL import Image + + rgba = frame + # Composite onto transparent-as-skip: track alpha to decide background. + pal = rgba.convert("RGB").quantize(colors=255, method=Image.MEDIANCUT) + palette = pal.getpalette() or [] + px = pal.load() + alpha = rgba.getchannel("A").load() + w, h = pal.size + + out = ["\x1bP0;1;0q", '"1;1;%d;%d' % (w, h)] + # Color register definitions (sixel uses 0..100 scale). + used = sorted({px[x, y] for y in range(h) for x in range(w)}) + for idx in used: + r = palette[idx * 3] if idx * 3 < len(palette) else 0 + g = palette[idx * 3 + 1] if idx * 3 + 1 < len(palette) else 0 + b = palette[idx * 3 + 2] if idx * 3 + 2 < len(palette) else 0 + out.append("#%d;2;%d;%d;%d" % (idx, r * 100 // 255, g * 100 // 255, b * 100 // 255)) + + # Emit in 6-row bands. + for band in range(0, h, 6): + for color_idx in used: + line = ["#%d" % color_idx] + run_char = None + run_len = 0 + + def flush(): + nonlocal run_char, run_len + if run_char is None: + return + if run_len > 3: + line.append("!%d%s" % (run_len, run_char)) + else: + line.append(run_char * run_len) + run_char, run_len = None, 0 + + for x in range(w): + bits = 0 + for bit in range(6): + y = band + bit + if y < h and alpha[x, y] > 32 and px[x, y] == color_idx: + bits |= 1 << bit + ch = chr(63 + bits) + if ch == run_char: + run_len += 1 + else: + flush() + run_char, run_len = ch, 1 + flush() + out.append("".join(line) + "$") # carriage return within band + out.append("-") # next band + out.append("\x1b\\") + return "".join(out) + + +_HALF_BLOCK = "▀" + +# A single half-block cell: top pixel + bottom pixel as (r, g, b, a) tuples. +Cell = tuple[tuple[int, int, int, int], tuple[int, int, int, int]] + + +def _downscale_cells(frame, *, target_cols: int) -> list[list[Cell]]: + """Downscale a frame to a grid of half-block cells. + + Each cell pairs a top and bottom pixel so one terminal row encodes two + pixel rows. Returns rows of ``((tr,tg,tb,ta),(br,bg,bb,ba))`` — the + framework-neutral representation shared by the ANSI encoder (CLI) and the + structured ``cells`` API (Ink). + """ + from PIL import Image + + target_cols = max(4, target_cols) + aspect = frame.height / max(1, frame.width) + target_rows = max(2, int(round(target_cols * aspect * 0.5)) * 2) + small = frame.resize((target_cols, target_rows), Image.LANCZOS).convert("RGBA") + px = small.load() + + grid: list[list[Cell]] = [] + for y in range(0, target_rows, 2): + row: list[Cell] = [] + for x in range(target_cols): + top = px[x, y] + bottom = px[x, y + 1] if y + 1 < target_rows else (0, 0, 0, 0) + row.append((top, bottom)) + grid.append(row) + return grid + + +def _encode_unicode(frame, *, target_cols: int) -> str: + """Downscale to truecolor ANSI half-blocks (one char = 2 vertical pixels).""" + lines: list[str] = [] + for row in _downscale_cells(frame, target_cols=target_cols): + cells: list[str] = [] + for (tr, tg, tb, ta), (br, bg, bb, ba) in row: + if ta < 32 and ba < 32: + cells.append("\x1b[0m ") # fully transparent → blank + continue + cells.append(f"\x1b[38;2;{tr};{tg};{tb}m\x1b[48;2;{br};{bg};{bb}m{_HALF_BLOCK}") + lines.append("".join(cells) + "\x1b[0m") + return "\n".join(lines) + + +# ───────────────────────────────────────────────────────────────────────── +# Public renderer +# ───────────────────────────────────────────────────────────────────────── + +class PetRenderer: + """Holds a pet's spritesheet and yields encoded frames per (state, index). + + Construct once per pet, then call :meth:`frame` on an animation timer. + Cheap to call repeatedly — decoded frames are cached. + """ + + def __init__( + self, + spritesheet: str | Path, + *, + mode: str = "unicode", + scale: float = DEFAULT_SCALE, + unicode_cols: int = 20, + frame_w: int = FRAME_W, + frame_h: int = FRAME_H, + frames_per_state: int = FRAMES_PER_STATE, + ) -> None: + self.spritesheet = str(spritesheet) + self.mode = mode if mode in RENDER_MODES else "unicode" + self.scale = scale + self.unicode_cols = unicode_cols + self.frame_w = frame_w + self.frame_h = frame_h + self.frames_per_state = frames_per_state + + @property + def available(self) -> bool: + return self.mode != "off" and Path(self.spritesheet).is_file() + + def frame_count(self, state: PetState | str) -> int: + return len(self._frames(state)) + + def _frames(self, state: PetState | str): + value = state.value if isinstance(state, PetState) else str(state) + scale_w = max(1, int(self.frame_w * self.scale)) + scale_h = max(1, int(self.frame_h * self.scale)) + return _frames_for( + self.spritesheet, + value, + self.frame_w, + self.frame_h, + self.frames_per_state, + scale_w, + scale_h, + ) + + def cells(self, state: PetState | str, index: int, *, cols: int | None = None) -> list[list[Cell]]: + """Return one frame as a half-block cell grid (framework-neutral). + + Used by the TUI, which renders the grid with native Ink color props + instead of raw ANSI. Returns ``[]`` when no frame is available. + """ + frames = self._frames(state) + if not frames: + return [] + frame = frames[index % len(frames)] + return _downscale_cells(frame, target_cols=cols or self.unicode_cols) + + def _cell_box(self, frame) -> tuple[int, int]: + """Terminal cell box for a scaled frame (~8×16 px per cell). + + Must match :meth:`frame` graphics sizing — kitty stretches the image to + fill ``c``×``r`` cells, so these must reflect the scaled pixel + dimensions, not a native-aspect column count (that upscales small pets). + """ + return max(1, frame.width // 8), max(1, frame.height // 16) + + def kitty_payload(self, state: PetState | str, *, image_id: int) -> dict | None: + """Build the kitty Unicode-placeholder payload for one state. + + Returns ``{cols, rows, placeholder, frames}`` where ``frames`` is a + list of transmit escapes (one per animation frame, all reusing + ``image_id``) and ``placeholder`` is the static text grid Ink paints. + Placement geometry is derived from the scaled frame pixels (via + :meth:`_cell_box`), not ``unicode_cols`` — kitty upscales to fill + ``c``×``r`` cells. ``None`` when no frame is available. + """ + frames = self._frames(state) + if not frames: + return None + cols, rows = self._cell_box(frames[0]) + return { + "cols": cols, + "rows": rows, + "placeholder": kitty_placeholder_rows(cols, rows), + "frames": [ + _encode_kitty_virtual(f, image_id=image_id, cols=cols, rows=rows) for f in frames + ], + } + + def frame(self, state: PetState | str, index: int) -> str: + """Return the encoded escape string for one frame, or ``""``. + + ``index`` is taken modulo the available frame count so callers can pass + a free-running counter. + """ + if self.mode == "off": + return "" + frames = self._frames(state) + if not frames: + return "" + frame = frames[index % len(frames)] + cell_cols, cell_rows = self._cell_box(frame) + + try: + if self.mode == "kitty": + return _encode_kitty(frame, cell_cols=cell_cols, cell_rows=cell_rows) + if self.mode == "iterm": + return _encode_iterm(frame, cell_cols=cell_cols, cell_rows=cell_rows) + if self.mode == "sixel": + return _encode_sixel(frame) + return _encode_unicode(frame, target_cols=self.unicode_cols) + except Exception as exc: # noqa: BLE001 - degrade silently + logger.debug("pet frame encode failed (mode=%s): %s", self.mode, exc) + return "" + + +def build_renderer( + spritesheet: str | Path, + *, + configured_mode: str | None = None, + scale: float = DEFAULT_SCALE, + unicode_cols: int = 20, + stream=None, +) -> PetRenderer: + """Convenience factory: resolve the mode from config+env, then construct.""" + mode = resolve_mode(configured_mode, stream=stream) + return PetRenderer( + spritesheet, + mode=mode, + scale=scale, + unicode_cols=unicode_cols, + ) diff --git a/agent/pet/state.py b/agent/pet/state.py new file mode 100644 index 00000000000..a9ad5afd801 --- /dev/null +++ b/agent/pet/state.py @@ -0,0 +1,81 @@ +"""Map agent activity → a :class:`PetState`. + +This is the one place the "what is the agent doing right now?" → "which +animation row?" decision lives. Each surface feeds it the signals it already +tracks: + +- CLI — ``KawaiiSpinner`` waiting/thinking state + tool outcomes. +- TUI — gateway ``tool.start/complete`` + ``message.delta/complete`` events. +- Desktop — the ``$busy``/``$awaitingResponse``/tool-event nanostores + (re-implemented in TS, but mirroring this priority order). + +Keeping the priority order here (and documenting it) lets the TypeScript +mirror stay faithful without a second design. +""" + +from __future__ import annotations + +from collections.abc import Iterable +from typing import Any + +from agent.pet.constants import PetState + + +def todos_all_done(todos: Iterable[Any] | None) -> bool: + """True iff there's ≥1 todo and every one is completed/cancelled. + + The "celebrate" beat (``JUMP``) fires when a plan finishes; this mirrors + the TUI's ``isTodoDone`` so the trigger is defined once across surfaces. + Accepts dicts (``{"status": ...}``) or objects with a ``status`` attr. + """ + items = list(todos or []) + if not items: + return False + + def _status(t: Any) -> Any: + return t.get("status") if isinstance(t, dict) else getattr(t, "status", None) + + return all(_status(t) in ("completed", "cancelled") for t in items) + + +def derive_pet_state( + *, + busy: bool = False, + awaiting_input: bool = False, + error: bool = False, + celebrate: bool = False, + just_completed: bool = False, + tool_running: bool = False, + reasoning: bool = False, +) -> PetState: + """Resolve the animation state from coarse activity signals. + + Priority (highest first) — only one row can show at a time, so the most + salient signal wins: + + 1. ``error`` → ``FAILED`` (a tool/turn just failed) + 2. ``celebrate`` → ``JUMP`` (explicit success beat, e.g. todos done) + 3. ``just_completed`` → ``WAVE`` (turn finished cleanly / greeting) + 4. ``awaiting_input`` → ``WAITING`` (blocked on the user — a clarify/approval + prompt is open; this outranks the in-flight signals below because the turn + is paused on *you*, even though a tool is technically mid-call) + 5. ``tool_running`` → ``RUN`` (a tool is executing) + 6. ``reasoning`` → ``REVIEW`` (model is thinking / reading) + 7. ``busy`` → ``RUN`` (turn in flight, unspecified work) + 8. otherwise → ``IDLE`` + """ + if error: + return PetState.FAILED + if celebrate: + return PetState.JUMP + if just_completed: + return PetState.WAVE + if awaiting_input: + return PetState.WAITING + if tool_running: + return PetState.RUN + if reasoning: + return PetState.REVIEW + if busy: + return PetState.RUN + return PetState.IDLE diff --git a/agent/pet/store.py b/agent/pet/store.py new file mode 100644 index 00000000000..46cc3bc9cac --- /dev/null +++ b/agent/pet/store.py @@ -0,0 +1,316 @@ +"""On-disk pet store — install / list / resolve pets. + +Pets live under ``get_hermes_home()/pets//`` so every profile gets its +own set (we deliberately do **not** reuse petdex's ``~/.codex/pets`` default — +that's owned by the petdex npm CLI and isn't profile-aware). Each installed +pet directory holds: + + pets// + pet.json # {id, displayName, description, spritesheetPath} + spritesheet.webp # (or .png) + +The active pet is resolved from the caller-supplied ``display.pet.slug`` config +value (falling back to the first installed pet), so this module stays free of +the config loader. +""" + +from __future__ import annotations + +import json +import logging +from dataclasses import dataclass +from pathlib import Path + +from hermes_constants import get_hermes_home + +logger = logging.getLogger(__name__) + +_DOWNLOAD_TIMEOUT = 60.0 + + +class PetStoreError(RuntimeError): + """Raised on install/IO failures.""" + + +@dataclass(frozen=True) +class InstalledPet: + """A pet present on disk.""" + + slug: str + display_name: str + description: str + directory: Path + spritesheet: Path + + @property + def exists(self) -> bool: + return self.spritesheet.is_file() + + +def pets_dir() -> Path: + """Return the profile-scoped pets directory (created on demand).""" + path = get_hermes_home() / "pets" + path.mkdir(parents=True, exist_ok=True) + return path + + +def _read_pet_json(directory: Path) -> dict: + pet_json = directory / "pet.json" + if not pet_json.is_file(): + return {} + try: + return json.loads(pet_json.read_text(encoding="utf-8")) + except (OSError, ValueError) as exc: + logger.debug("unreadable pet.json in %s: %s", directory, exc) + return {} + + +def _resolve_spritesheet(directory: Path, meta: dict) -> Path: + """Find the spritesheet for a pet dir. + + Honors ``spritesheetPath`` from pet.json, else probes the conventional + filenames (``spritesheet.{webp,png}`` and petdex R2's ``sprite.webp``). + """ + declared = str(meta.get("spritesheetPath", "") or "").strip() + if declared: + candidate = directory / declared + if candidate.is_file(): + return candidate + for name in ("spritesheet.webp", "spritesheet.png", "sprite.webp", "sprite.png"): + candidate = directory / name + if candidate.is_file(): + return candidate + # Default expectation even if missing, so callers get a stable path. + return directory / "spritesheet.webp" + + +def load_pet(slug: str) -> InstalledPet | None: + """Return the :class:`InstalledPet` for *slug*, or ``None`` if absent.""" + slug = slug.strip() + directory = pets_dir() / slug + if not directory.is_dir(): + return None + meta = _read_pet_json(directory) + return InstalledPet( + slug=slug, + display_name=str(meta.get("displayName", "") or slug), + description=str(meta.get("description", "") or ""), + directory=directory, + spritesheet=_resolve_spritesheet(directory, meta), + ) + + +def installed_pets() -> list[InstalledPet]: + """Return every installed pet (dirs containing a usable spritesheet).""" + out: list[InstalledPet] = [] + for child in sorted(pets_dir().iterdir()): + if not child.is_dir(): + continue + pet = load_pet(child.name) + if pet and pet.exists: + out.append(pet) + return out + + +def resolve_active_pet(configured_slug: str | None = None) -> InstalledPet | None: + """Resolve which pet to display. + + Precedence: the configured slug (``display.pet.slug``) if it's installed, + otherwise the first installed pet alphabetically, otherwise ``None``. + """ + if configured_slug: + pet = load_pet(configured_slug.strip()) + if pet and pet.exists: + return pet + pets = installed_pets() + return pets[0] if pets else None + + +def install_pet(slug: str, *, force: bool = False, timeout: float = _DOWNLOAD_TIMEOUT) -> InstalledPet: + """Download *slug* from the manifest into the pets directory. + + Idempotent: a fully-installed pet is returned as-is unless *force*. Raises + :class:`PetStoreError` / :class:`~agent.pet.manifest.ManifestError` on + failure. + """ + from agent.pet.manifest import find_entry + + slug = slug.strip() + existing = load_pet(slug) + if existing and existing.exists and not force: + return existing + + entry = find_entry(slug, timeout=timeout) + if entry is None: + raise PetStoreError(f"pet '{slug}' is not in the petdex manifest") + + directory = pets_dir() / slug + directory.mkdir(parents=True, exist_ok=True) + + sprite_ext = ".png" if entry.spritesheet_url.lower().split("?")[0].endswith(".png") else ".webp" + sprite_path = directory / f"spritesheet{sprite_ext}" + + _download(entry.spritesheet_url, sprite_path, timeout=timeout) + + # Fetch the upstream pet.json if present; otherwise synthesize a minimal + # one so the local layout is self-describing. + meta: dict = {} + if entry.pet_json_url: + try: + meta = _download_json(entry.pet_json_url, timeout=timeout) + except Exception as exc: # noqa: BLE001 - non-fatal, fall back below + logger.debug("pet.json fetch failed for %s: %s", slug, exc) + if not isinstance(meta, dict) or not meta: + meta = {"id": slug, "displayName": entry.display_name, "description": ""} + meta["spritesheetPath"] = sprite_path.name + meta.setdefault("id", slug) + meta.setdefault("displayName", entry.display_name) + (directory / "pet.json").write_text(json.dumps(meta, indent=2), encoding="utf-8") + + pet = load_pet(slug) + if pet is None or not pet.exists: + raise PetStoreError(f"install of '{slug}' did not produce a spritesheet") + return pet + + +_THUMB_FRAME_W = 192 +_THUMB_FRAME_H = 208 +_THUMB_W = 96 # rendered ~40px; 2x+ keeps it crisp on HiDPI + + +def _thumbs_dir() -> Path: + path = pets_dir() / ".thumbs" + path.mkdir(parents=True, exist_ok=True) + return path + + +def _is_petdex_host(url: str) -> bool: + """True only for petdex.dev hosts — bounds server-side fetch (anti-SSRF).""" + from urllib.parse import urlparse + + try: + host = (urlparse(url).hostname or "").lower() + except ValueError: + return False + return host == "petdex.dev" or host.endswith(".petdex.dev") + + +def thumbnail_png(slug: str, *, source_url: str = "", timeout: float = 30.0) -> bytes | None: + """Return a small idle-frame PNG for *slug*, cached on disk. + + Crops the top-left (idle, frame 0) cell of the spritesheet and downsamples + it to a thumbnail. Source preference: an installed spritesheet on disk, else + *source_url* — but only when it points at petdex (so the gateway never + fetches an arbitrary client-supplied URL). Returns ``None`` when there's no + usable source or Pillow/network fails; callers render a placeholder. + + Doing this server-side sidesteps the renderer's CSP / R2 hotlink limits that + break a direct ```` and lets the result ride the authenticated + gateway as a same-origin data URL. + """ + slug = slug.strip() + if not slug: + return None + + cache = _thumbs_dir() / f"{slug}.png" + if cache.is_file(): + try: + return cache.read_bytes() + except OSError: + pass + + sheet_bytes: bytes | None = None + pet = load_pet(slug) + if pet and pet.exists: + try: + sheet_bytes = pet.spritesheet.read_bytes() + except OSError: + sheet_bytes = None + + if sheet_bytes is None and source_url and _is_petdex_host(source_url): + try: + import httpx + + resp = httpx.get( + source_url, + timeout=timeout, + follow_redirects=True, + headers={"User-Agent": "hermes-agent-petdex"}, + ) + resp.raise_for_status() + sheet_bytes = resp.content + except Exception as exc: # noqa: BLE001 - cosmetic, degrade to placeholder + logger.debug("thumb fetch failed for %s: %s", slug, exc) + + if not sheet_bytes: + return None + + try: + import io + + from PIL import Image + + with Image.open(io.BytesIO(sheet_bytes)) as im: + frame = im.convert("RGBA").crop( + (0, 0, min(_THUMB_FRAME_W, im.width), min(_THUMB_FRAME_H, im.height)) + ) + height = round(_THUMB_W * _THUMB_FRAME_H / _THUMB_FRAME_W) + frame = frame.resize((_THUMB_W, height), Image.NEAREST) + buf = io.BytesIO() + frame.save(buf, format="PNG") + data = buf.getvalue() + except Exception as exc: # noqa: BLE001 + logger.debug("thumb crop failed for %s: %s", slug, exc) + return None + + try: + cache.write_bytes(data) + except OSError: + pass + return data + + +def remove_pet(slug: str) -> bool: + """Delete an installed pet directory. Returns True if anything was removed.""" + import shutil + + directory = pets_dir() / slug.strip() + if not directory.is_dir(): + return False + shutil.rmtree(directory, ignore_errors=True) + return not directory.exists() + + +def _download(url: str, dest: Path, *, timeout: float) -> None: + import httpx + + try: + with httpx.stream( + "GET", + url, + timeout=timeout, + follow_redirects=True, + headers={"User-Agent": "hermes-agent-petdex"}, + ) as resp: + resp.raise_for_status() + tmp = dest.with_suffix(dest.suffix + ".part") + with tmp.open("wb") as fh: + for chunk in resp.iter_bytes(): + fh.write(chunk) + tmp.replace(dest) + except Exception as exc: # noqa: BLE001 + raise PetStoreError(f"download failed for {url}: {exc}") from exc + + +def _download_json(url: str, *, timeout: float) -> dict: + import httpx + + resp = httpx.get( + url, + timeout=timeout, + follow_redirects=True, + headers={"User-Agent": "hermes-agent-petdex"}, + ) + resp.raise_for_status() + data = resp.json() + return data if isinstance(data, dict) else {} diff --git a/hermes_cli/config.py b/hermes_cli/config.py index f698c11d5ac..a557899ae98 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -1633,6 +1633,31 @@ DEFAULT_CONFIG = { "fields": ["model", "context_pct", "cwd"], # Order shown; drop any to hide }, "copy_shortcut": "auto", # "auto" (platform default) | "ctrl_c" | "ctrl_shift_c" | "disabled" + # Petdex animated mascot (https://github.com/crafter-station/petdex). + # A purely cosmetic sprite that reacts to agent activity across the + # CLI, TUI, and desktop app. Manage with `hermes pets`. Disabled until + # a pet is installed + selected (no effect on prompt caching — this is + # a display concern only). + "pet": { + "enabled": False, + # Active pet slug; resolved against installed pets in + # get_hermes_home()/pets/. Empty → first installed pet. + "slug": "", + # Terminal render protocol for CLI/TUI: + # auto — detect kitty/iTerm2/sixel, else unicode half-blocks + # kitty | iterm | sixel | unicode | off + "render_mode": "auto", + # Master size scalar (relative to native 192×208 frames). One knob + # shrinks every surface: the desktop canvas scales its pixels by it + # and the CLI/TUI derive their terminal column width from it. The + # half-block fallback clamps to a legibility floor (it can't shrink + # as far as true-pixel kitty/GUI without turning to mush). + "scale": 0.33, + # Hard override for terminal column width. 0 = auto (derive from + # scale); set a positive int only to pin the half-block/kitty width + # independently of scale. + "unicode_cols": 0, + }, }, # Web dashboard settings diff --git a/tests/agent/test_pet_engine.py b/tests/agent/test_pet_engine.py new file mode 100644 index 00000000000..e7816134133 --- /dev/null +++ b/tests/agent/test_pet_engine.py @@ -0,0 +1,371 @@ +"""Tests for the petdex pet engine (agent/pet/*). + +Behavior/invariant focused — no network, no live manifest. A tiny synthetic +spritesheet is generated with Pillow so render paths exercise real decode +without depending on a downloaded pet. +""" + +from __future__ import annotations + +import io + +import pytest + +from agent.pet import constants, render, state, store +from agent.pet.constants import FRAME_H, FRAME_W, PetState + + +# ───────────────────────────────────────────────────────────────────────── +# state mapping — priority invariants +# ───────────────────────────────────────────────────────────────────────── + +def test_derive_idle_default(): + assert state.derive_pet_state() is PetState.IDLE + # awaiting input uses the dedicated waiting row when available. + assert state.derive_pet_state(awaiting_input=True) is PetState.WAITING + + +def test_derive_priority_order(): + # error beats everything + assert state.derive_pet_state(error=True, celebrate=True, busy=True) is PetState.FAILED + # celebrate beats completion/tool + assert state.derive_pet_state(celebrate=True, just_completed=True, tool_running=True) is PetState.JUMP + # completion beats waiting/tool + assert state.derive_pet_state(just_completed=True, awaiting_input=True) is PetState.WAVE + # waiting (blocked on the user) outranks the in-flight signals — a clarify + # mid-turn pauses on you even though a tool is technically still open. + assert state.derive_pet_state(awaiting_input=True, tool_running=True, busy=True) is PetState.WAITING + # tool beats reasoning + assert state.derive_pet_state(tool_running=True, reasoning=True) is PetState.RUN + # reasoning beats bare-busy + assert state.derive_pet_state(reasoning=True, busy=True) is PetState.REVIEW + # bare busy runs + assert state.derive_pet_state(busy=True) is PetState.RUN + + +def test_todos_all_done(): + # empty / falsy → not done (no plan to celebrate) + assert state.todos_all_done(None) is False + assert state.todos_all_done([]) is False + # any open item → not done + assert state.todos_all_done([{"status": "completed"}, {"status": "pending"}]) is False + assert state.todos_all_done([{"status": "in_progress"}]) is False + # every item terminal → done (completed and/or cancelled) + assert state.todos_all_done([{"status": "completed"}, {"status": "cancelled"}]) is True + + # objects with a .status attr work too (mirrors dict + attr access) + class _T: + def __init__(self, status): + self.status = status + + assert state.todos_all_done([_T("completed")]) is True + assert state.todos_all_done([_T("completed"), _T("pending")]) is False + + +def test_state_row_index_maps_to_supported_atlas_taxonomies(): + # Current Petdex sheets are 8 columns x 9 rows. + assert constants.state_row_index(PetState.IDLE, 9) == 0 + assert constants.state_row_index(PetState.WAVE, 9) == 3 + assert constants.state_row_index(PetState.JUMP, 9) == 4 + assert constants.state_row_index(PetState.FAILED, 9) == 5 + assert constants.state_row_index(PetState.WAITING, 9) == 6 + assert constants.state_row_index(PetState.RUN, 9) == 7 + assert constants.state_row_index(PetState.REVIEW, 9) == 8 + + # Legacy Hermes/petdex sheets were 8 rows with Hermes state names packed in + # order. Keep those readable instead of forcing old installs through the + # newer Codex taxonomy. + assert constants.state_row_index(PetState.WAVE, 8) == 1 + assert constants.state_row_index(PetState.RUN, 8) == 2 + assert constants.state_row_index(PetState.FAILED, 8) == 3 + assert constants.state_row_index(PetState.REVIEW, 8) == 4 + assert constants.state_row_index(PetState.JUMP, 8) == 5 + assert constants.state_row_index(PetState.WAITING, 8) == 0 + + # Alias rows resolve as expected. + assert constants.state_row_index("wave", 9) == constants.state_row_index("waving", 9) == 3 + assert constants.state_row_index("jump", 9) == constants.state_row_index("jumping", 9) == 4 + assert constants.state_row_index("run", 9) == constants.state_row_index("running", 9) == 7 + + # unknown row names clamp to idle (row 0), never raise + assert constants.state_row_index("nonsense") == 0 + + +def test_cols_for_scale_is_monotonic_and_floored(): + # scale is the master size knob: smaller scale never yields more columns, + # and half-blocks clamp to a legibility floor rather than devolving to mush. + sizes = [constants.cols_for_scale(s) for s in (0.1, 0.3, 0.5, 0.7, 1.0, 1.5)] + assert sizes == sorted(sizes) + assert all(c >= constants.UNICODE_MIN_COLS for c in sizes) + # tiny scales pin to the floor; large scales grow past it. + assert constants.cols_for_scale(0.05) == constants.UNICODE_MIN_COLS + assert constants.cols_for_scale(0.33) == constants.UNICODE_MIN_COLS + assert constants.cols_for_scale(2.0) > constants.UNICODE_MIN_COLS + + +def test_resolve_cols_override_else_scale(): + # 0 / falsy → derive from scale; a positive int hard-overrides scale. + assert constants.resolve_cols(0.7, 0) == constants.cols_for_scale(0.7) + assert constants.resolve_cols(0.7, None) == constants.cols_for_scale(0.7) + assert constants.resolve_cols(2.0, 12) == 12 + assert constants.resolve_cols(0.1, -5) == constants.cols_for_scale(0.1) + + +# ───────────────────────────────────────────────────────────────────────── +# synthetic spritesheet fixture +# ───────────────────────────────────────────────────────────────────────── + +@pytest.fixture +def boba_like(tmp_path, monkeypatch): + """Install a synthetic 8-col × 9-row pet into a temp HERMES_HOME.""" + from PIL import Image + + home = tmp_path / ".hermes" + home.mkdir() + monkeypatch.setenv("HERMES_HOME", str(home)) + + cols, rows = 8, 9 + sheet = Image.new("RGBA", (FRAME_W * cols, FRAME_H * rows), (0, 0, 0, 0)) + # paint each row a distinct opaque color so frames are non-empty + for r in range(rows): + color = (20 + r * 25, 60, 120, 255) + for c in range(cols): + block = Image.new("RGBA", (FRAME_W, FRAME_H), color) + sheet.paste(block, (c * FRAME_W, r * FRAME_H)) + + pet_dir = store.pets_dir() / "boba" + pet_dir.mkdir(parents=True, exist_ok=True) + sheet.save(pet_dir / "spritesheet.webp") + (pet_dir / "pet.json").write_text( + '{"id":"boba","displayName":"Boba","description":"d","spritesheetPath":"spritesheet.webp"}' + ) + return pet_dir + + +def test_store_install_resolution(boba_like): + pets = store.installed_pets() + assert [p.slug for p in pets] == ["boba"] + assert store.installed_pets()[0].exists + + # configured slug wins when installed + assert store.resolve_active_pet("boba").slug == "boba" + # bogus slug falls back to first installed + assert store.resolve_active_pet("does-not-exist").slug == "boba" + # display metadata flows from pet.json + assert store.load_pet("boba").display_name == "Boba" + + +def test_store_remove(boba_like): + assert store.remove_pet("boba") is True + assert store.installed_pets() == [] + assert store.remove_pet("boba") is False # idempotent + + +# ───────────────────────────────────────────────────────────────────────── +# render — decode + every encoder produces output +# ───────────────────────────────────────────────────────────────────────── + +def test_renderer_decodes_frames(boba_like): + sprite = store.load_pet("boba").spritesheet + r = render.PetRenderer(str(sprite), mode="unicode", scale=0.5, unicode_cols=12) + assert r.available + # standard sheet yields FRAMES_PER_STATE frames per state + assert r.frame_count("idle") == constants.FRAMES_PER_STATE + assert r.frame_count(PetState.RUN) == constants.FRAMES_PER_STATE + + +def test_trims_trailing_blank_frames(tmp_path): + """Ragged state rows (real frames + transparent padding) trim to real count. + + petdex sheets are left-packed: a state with fewer than FRAMES_PER_STATE real + frames pads the trailing columns transparent. Stepping into one flashes the + pet blank, so the engine must stop the row at the first gap. + """ + from PIL import Image + + cols, rows = 8, 9 + sheet = Image.new("RGBA", (FRAME_W * cols, FRAME_H * rows), (0, 0, 0, 0)) + # row index -> number of real (opaque) frames; the rest stay transparent. + # Codex row taxonomy: idle, running-right, running-left, wave, jump, failed, + # waiting, run, review. + real = {0: 6, 3: 4, 4: 5, 5: 8, 7: 6, 8: 5} + for r, k in real.items(): + for c in range(k): + block = Image.new("RGBA", (FRAME_W, FRAME_H), (200, 80, 80, 255)) + sheet.paste(block, (c * FRAME_W, r * FRAME_H)) + sprite = tmp_path / "ragged.webp" + sheet.save(sprite) + + r = render.PetRenderer(str(sprite), mode="unicode", scale=0.5) + # Full rows cap at FRAMES_PER_STATE; ragged rows trim to their real count. + assert r.frame_count("idle") == constants.FRAMES_PER_STATE + assert r.frame_count("run") == constants.FRAMES_PER_STATE + assert r.frame_count("wave") == 4 + assert r.frame_count("jump") == 5 + assert r.frame_count("failed") == constants.FRAMES_PER_STATE + assert r.frame_count("review") == 5 + + # Every stepped frame is non-empty — no blank flash for the trimmed states. + for state in ("wave", "jump", "review"): + for i in range(r.frame_count(state)): + assert r.frame(state, i), f"{state}[{i}] rendered blank" + + counts = render.state_frame_counts(str(sprite)) + assert counts == { + "idle": 6, + "wave": 4, + "run": 6, + "failed": 6, + "review": 5, + "jump": 5, + "waiting": 0, + } + + +@pytest.mark.parametrize("mode", ["unicode", "kitty", "iterm", "sixel"]) +def test_every_encoder_emits(boba_like, mode): + sprite = store.load_pet("boba").spritesheet + r = render.PetRenderer(str(sprite), mode=mode, scale=0.4) + frame = r.frame("run", 1) + assert isinstance(frame, str) and frame, f"{mode} produced no frame" + if mode == "unicode": + assert "\x1b[" in frame # has color escapes + elif mode == "kitty": + assert frame.startswith("\x1b_G") + elif mode == "iterm": + assert frame.startswith("\x1b]1337;File=") + elif mode == "sixel": + assert frame.startswith("\x1bP") + + +def test_frame_index_wraps(boba_like): + sprite = store.load_pet("boba").spritesheet + r = render.PetRenderer(str(sprite), mode="unicode", scale=0.4) + # index beyond count wraps rather than indexing out of range + assert r.frame("idle", 999) == r.frame("idle", 999 % r.frame_count("idle")) + + +def test_cells_grid_shape(boba_like): + sprite = store.load_pet("boba").spritesheet + r = render.PetRenderer(str(sprite), mode="unicode", scale=0.4, unicode_cols=14) + grid = r.cells("run", 0, cols=14) + assert grid, "no cells produced" + # every row is the requested width; every cell is (top, bottom) RGBA pairs + assert all(len(row) == 14 for row in grid) + (top, bottom) = grid[0][0] + assert len(top) == 4 and len(bottom) == 4 + # missing-sheet renderer yields no cells, never raises + assert render.PetRenderer(str(sprite.parent / "missing.webp"), mode="unicode").cells("idle", 0) == [] + + +# ───────────────────────────────────────────────────────────────────────── +# render — kitty Unicode placeholders (TUI graphics path) +# ───────────────────────────────────────────────────────────────────────── + +def test_kitty_image_id_stable_bounded_nonzero(): + # Deterministic per slug so re-renders reuse the same terminal-side image, + # and always a valid 24-bit-encodable, non-zero id. + a = render.kitty_image_id("boba") + assert a == render.kitty_image_id("boba") + assert 1 <= a <= 0x7FFF + + +def test_kitty_color_hex_decodes_to_id(): + # The placeholder's foreground color IS the image id (24-bit). The terminal + # reconstructs id = (r<<16)|(g<<8)|b, so the hex must round-trip. + for slug in ("boba", "clawd", "pixel-fox"): + image_id = render.kitty_image_id(slug) + h = render.kitty_color_hex(image_id) + assert h.startswith("#") and len(h) == 7 + assert int(h[1:], 16) == image_id + + +def test_kitty_placeholder_rows_grid_contract(): + cols, rows = 18, 10 + grid = render.kitty_placeholder_rows(cols, rows) + assert len(grid) == rows + placeholder = "\U0010eeee" + for r, row in enumerate(grid): + # Each line is exactly `cols` placeholder cells (combining diacritics + # are zero-width, so this is the rendered width Ink must measure). + assert row.count(placeholder) == cols + # First cell carries this row's diacritic; the rest inherit row + col. + assert row.startswith(placeholder + chr(render._ROWCOL_DIACRITICS[r])) + + +def test_kitty_payload_structure(boba_like): + sprite = store.load_pet("boba").spritesheet + image_id = render.kitty_image_id("boba") + scale = 0.4 + r = render.PetRenderer(str(sprite), mode="kitty", scale=scale, unicode_cols=18) + payload = r.kitty_payload("run", image_id=image_id) + assert payload is not None + # placement box must follow scaled pixels, not unicode_cols (kitty upscales to c×r). + frames = r._frames("run") + expect_cols, expect_rows = r._cell_box(frames[0]) + assert payload["cols"] == expect_cols + assert payload["rows"] == expect_rows + assert expect_cols < 18 # 0.4 scale is much smaller than a pinned 18-col box + # placeholder grid matches the requested geometry + assert len(payload["placeholder"]) == payload["rows"] + # one transmit escape per animation frame, each a kitty virtual placement + assert len(payload["frames"]) == r.frame_count("run") + for esc in payload["frames"]: + assert esc.startswith("\x1b_G") + assert esc.endswith("\x1b\\") + assert f"i={image_id}" in esc + assert "a=T" in esc and "U=1" in esc + assert f"c={payload['cols']}" in esc and f"r={payload['rows']}" in esc + + +def test_kitty_payload_none_when_no_frames(tmp_path): + r = render.PetRenderer(str(tmp_path / "missing.webp"), mode="kitty") + assert r.kitty_payload("idle", image_id=1) is None + + +def test_off_mode_and_missing_sheet_degrade(tmp_path): + # off mode never emits + r_off = render.PetRenderer(str(tmp_path / "nope.webp"), mode="off") + assert r_off.frame("idle", 0) == "" + # missing sheet → not available, empty frames, no raise + r_missing = render.PetRenderer(str(tmp_path / "nope.webp"), mode="unicode") + assert not r_missing.available + assert r_missing.frame("idle", 0) == "" + + +def test_resolve_mode_non_tty_is_off(): + # a non-tty stream forces 'off' regardless of configured mode + assert render.resolve_mode("kitty", stream=io.StringIO()) == "off" + assert render.resolve_mode("auto", stream=io.StringIO()) == "off" + + +def test_detect_terminal_graphics_env(monkeypatch): + for key in ("KITTY_WINDOW_ID", "TERM_PROGRAM", "ITERM_SESSION_ID", "WEZTERM_PANE", "TERM"): + monkeypatch.delenv(key, raising=False) + + monkeypatch.setenv("KITTY_WINDOW_ID", "1") + assert render.detect_terminal_graphics() == "kitty" + monkeypatch.delenv("KITTY_WINDOW_ID") + + monkeypatch.setenv("TERM_PROGRAM", "iTerm.app") + assert render.detect_terminal_graphics() == "iterm" + monkeypatch.delenv("TERM_PROGRAM") + + monkeypatch.setenv("TERM", "xterm-256color") + assert render.detect_terminal_graphics() == "unicode" + + +def test_vscode_terminal_ignores_leaked_graphics_env(monkeypatch): + # The VS Code / Cursor integrated terminal can't show inline images by + # default, yet inherits ITERM_SESSION_ID/KITTY_WINDOW_ID when launched from + # those terminals. TERM_PROGRAM=vscode must win → unicode, never a protocol + # whose escapes the embedded terminal would silently drop. + for key in ("KITTY_WINDOW_ID", "TERM_PROGRAM", "ITERM_SESSION_ID", "WEZTERM_PANE", "TERM"): + monkeypatch.delenv(key, raising=False) + monkeypatch.setenv("TERM_PROGRAM", "vscode") + + assert render.detect_terminal_graphics() == "unicode" + for leaked in ("ITERM_SESSION_ID", "KITTY_WINDOW_ID", "WEZTERM_PANE"): + monkeypatch.setenv(leaked, "1") + assert render.detect_terminal_graphics() == "unicode" + monkeypatch.delenv(leaked)