mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-27 11:22:03 +00:00
feat(pets): pet engine + display.pet config
Add the shared pet engine under agent/pet/: spritesheet manifest loading and in-process caching, six-state animation model, frame rendering, and the persistent pet store. Register the display.pet config block (pet, scale, enabled, etc.) that every surface reads from. Covered by tests/agent/test_pet_engine.py.
This commit is contained in:
parent
03d9a95a74
commit
e7dbfdaad7
8 changed files with 1757 additions and 0 deletions
51
agent/pet/__init__.py
Normal file
51
agent/pet/__init__.py
Normal file
|
|
@ -0,0 +1,51 @@
|
|||
"""Petdex pet engine — shared core for the CLI, TUI, and desktop surfaces.
|
||||
|
||||
Petdex (https://github.com/crafter-station/petdex) is a public gallery of
|
||||
animated sprite "pets" for coding agents. Each pet is a ``pet.json`` plus a
|
||||
``spritesheet.{webp,png}`` of 192×208 px cells. Current Codex/petdex sheets use
|
||||
an 8-column × 9-row atlas; older Hermes/petdex sheets used an 8-row atlas.
|
||||
Hermes infers the row taxonomy from the sheet and maps agent activity onto
|
||||
idle/run/review/failed/wave/jump.
|
||||
|
||||
This package is the **single source of truth** for the feature so the base
|
||||
CLI (Python) and TUI (Ink, via ``tui_gateway``) never duplicate the hard
|
||||
parts:
|
||||
|
||||
- :mod:`agent.pet.constants` — frame geometry + the :class:`PetState` enum.
|
||||
- :mod:`agent.pet.state` — map agent activity → a :class:`PetState`.
|
||||
- :mod:`agent.pet.manifest` — fetch the public petdex manifest.
|
||||
- :mod:`agent.pet.store` — install / list / resolve pets on disk
|
||||
(profile-aware via ``get_hermes_home()``).
|
||||
- :mod:`agent.pet.render` — decode a spritesheet and encode frames for a
|
||||
terminal (kitty / iTerm2 / sixel graphics
|
||||
protocols, with a Unicode half-block
|
||||
fallback).
|
||||
|
||||
Rendering in the Electron desktop is necessarily TypeScript (canvas), but it
|
||||
reuses the same on-disk store and the same state semantics.
|
||||
|
||||
The whole feature is a *display* concern: it adds no model tool, mutates no
|
||||
system prompt or toolset, and therefore has zero effect on prompt caching.
|
||||
"""
|
||||
|
||||
from agent.pet.constants import (
|
||||
DEFAULT_SCALE,
|
||||
FRAME_H,
|
||||
FRAME_W,
|
||||
FRAMES_PER_STATE,
|
||||
LOOP_MS,
|
||||
STATE_ROWS,
|
||||
PetState,
|
||||
)
|
||||
from agent.pet.state import derive_pet_state
|
||||
|
||||
__all__ = [
|
||||
"DEFAULT_SCALE",
|
||||
"FRAME_H",
|
||||
"FRAME_W",
|
||||
"FRAMES_PER_STATE",
|
||||
"LOOP_MS",
|
||||
"STATE_ROWS",
|
||||
"PetState",
|
||||
"derive_pet_state",
|
||||
]
|
||||
167
agent/pet/constants.py
Normal file
167
agent/pet/constants.py
Normal file
|
|
@ -0,0 +1,167 @@
|
|||
"""Pet sprite geometry + animation-state taxonomy.
|
||||
|
||||
These values are the common petdex/Codex pet geometry. The real ``pet.json``
|
||||
usually only carries ``id``/``displayName``/``description``/``spritesheetPath``;
|
||||
row taxonomy is inferred from the atlas shape so Hermes can render both legacy
|
||||
8-row sheets and current 9-row Codex sheets.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from enum import Enum
|
||||
|
||||
# Frame geometry (pixels). Current Codex/petdex spritesheets are 8 columns x 9
|
||||
# rows (1536x1872), while older Hermes/petdex sheets used 9 columns x 8 rows
|
||||
# (1728x1664). Renderers derive both row taxonomy and real column count from the
|
||||
# concrete sheet, so either shape works.
|
||||
FRAME_W = 192
|
||||
FRAME_H = 208
|
||||
|
||||
# Frames consumed per animation state (the petdex web app uses CSS
|
||||
# ``steps(6)``). A sheet may physically contain more columns; we only step
|
||||
# through the first ``FRAMES_PER_STATE``.
|
||||
FRAMES_PER_STATE = 6
|
||||
|
||||
# Full-loop duration for one state, milliseconds (petdex default).
|
||||
LOOP_MS = 1100
|
||||
|
||||
# Default on-screen scale relative to native frame size. ``display.pet.scale``
|
||||
# is the single master scalar: the desktop canvas multiplies its native pixels
|
||||
# by it and every terminal surface derives its half-block/kitty column width
|
||||
# from it (see :func:`cols_for_scale`), so one number shrinks all three
|
||||
# interfaces together. (petdex's own clients render at 0.7; we default smaller
|
||||
# so the kitty/GUI mascot stays a glanceable corner sprite. The half-block
|
||||
# fallback can't shrink as far — see ``UNICODE_MIN_COLS`` — and clamps to its
|
||||
# legibility floor instead.)
|
||||
DEFAULT_SCALE = 0.33
|
||||
|
||||
# User-settable scale bounds (``/pet scale``, desktop slider). Floor keeps the
|
||||
# pet clickable/visible; ceiling stops a fat-fingered value from filling the
|
||||
# screen. The unicode fallback additionally clamps to ``UNICODE_MIN_COLS``.
|
||||
MIN_SCALE = 0.1
|
||||
MAX_SCALE = 3.0
|
||||
|
||||
|
||||
def clamp_scale(scale: float) -> float:
|
||||
"""Clamp *scale* to ``[MIN_SCALE, MAX_SCALE]`` (the single validation point)."""
|
||||
return max(MIN_SCALE, min(MAX_SCALE, scale))
|
||||
|
||||
# Terminal cells one native frame spans at ``scale == 1.0``. A cell is ~8px
|
||||
# wide, a frame is ``FRAME_W`` (192) px → 24 cells. This mirrors the kitty
|
||||
# graphics placement (``scaled_px // 8``) so at full scale every renderer agrees.
|
||||
BASE_UNICODE_COLS = FRAME_W // 8
|
||||
|
||||
# Legibility floor for the half-block fallback. A half-block cell samples the
|
||||
# sprite at only 1 horizontal + 2 vertical taps, so below this width a 192×208
|
||||
# pet collapses into an unreadable blob *regardless* of scale. kitty/GUI draw
|
||||
# true pixels and have no such floor — that's why the same ``scale: 0.33`` is
|
||||
# crisp there but mush in half-blocks. ``scale`` shrinks the unicode pet down
|
||||
# TO this floor (and grows it above), instead of past it into noise.
|
||||
UNICODE_MIN_COLS = 16
|
||||
|
||||
|
||||
def cols_for_scale(scale: float) -> int:
|
||||
"""Half-block width implied by *scale*, clamped to the legibility floor.
|
||||
|
||||
Above the floor it tracks the kitty cell box (``scaled_px // 8``) so the two
|
||||
renderers converge at larger sizes; below it the floor keeps the sprite
|
||||
readable rather than letting it devolve into a blob.
|
||||
"""
|
||||
return max(UNICODE_MIN_COLS, round(BASE_UNICODE_COLS * (scale or DEFAULT_SCALE)))
|
||||
|
||||
|
||||
def resolve_cols(scale: float, unicode_cols: int = 0) -> int:
|
||||
"""Resolve terminal width: explicit *unicode_cols* override, else from *scale*."""
|
||||
return int(unicode_cols) if unicode_cols and int(unicode_cols) > 0 else cols_for_scale(scale)
|
||||
|
||||
|
||||
class PetState(str, Enum):
|
||||
"""Animation state a pet can be shown in.
|
||||
|
||||
These are Hermes' activity state names. They are not always identical to the
|
||||
source atlas row names: Codex-format pets use rows like ``jumping`` /
|
||||
``running`` while the UI keeps the shorter ``jump`` / ``run`` names.
|
||||
"""
|
||||
|
||||
IDLE = "idle"
|
||||
WAVE = "wave"
|
||||
RUN = "run"
|
||||
FAILED = "failed"
|
||||
REVIEW = "review"
|
||||
JUMP = "jump"
|
||||
WAITING = "waiting"
|
||||
|
||||
|
||||
# Legacy Hermes/petdex row order (top -> bottom) used by the older 8-row,
|
||||
# 9-column atlas shape.
|
||||
LEGACY_STATE_ROWS: list[str] = [
|
||||
PetState.IDLE.value,
|
||||
PetState.WAVE.value,
|
||||
PetState.RUN.value,
|
||||
PetState.FAILED.value,
|
||||
PetState.REVIEW.value,
|
||||
PetState.JUMP.value,
|
||||
"extra1",
|
||||
"extra2",
|
||||
]
|
||||
|
||||
# Current Petdex row order (top -> bottom) used by 1536x1872 atlases:
|
||||
# 8 columns x 9 rows of 192x208 cells.
|
||||
CODEX_STATE_ROWS: list[str] = [
|
||||
PetState.IDLE.value,
|
||||
"running-right",
|
||||
"running-left",
|
||||
"waving",
|
||||
"jumping",
|
||||
PetState.FAILED.value,
|
||||
PetState.WAITING.value,
|
||||
"running",
|
||||
PetState.REVIEW.value,
|
||||
]
|
||||
|
||||
# Default/fallback for callers without a sheet. Prefer the current 9-row Codex
|
||||
# format because generated pets and the public Codex pet contract use it.
|
||||
STATE_ROWS: list[str] = CODEX_STATE_ROWS
|
||||
|
||||
# Canonical Hermes activity names -> accepted row-name aliases in descending
|
||||
# preference. This keeps our internal state names stable (`wave`/`jump`/`run`)
|
||||
# while matching Petdex's current `waving`/`jumping`/`running` taxonomy.
|
||||
STATE_ALIASES: dict[str, tuple[str, ...]] = {
|
||||
PetState.IDLE.value: (PetState.IDLE.value,),
|
||||
PetState.WAVE.value: (PetState.WAVE.value, "waving"),
|
||||
PetState.JUMP.value: (PetState.JUMP.value, "jumping"),
|
||||
PetState.RUN.value: (PetState.RUN.value, "running"),
|
||||
PetState.FAILED.value: (PetState.FAILED.value,),
|
||||
PetState.REVIEW.value: (PetState.REVIEW.value,),
|
||||
PetState.WAITING.value: (PetState.WAITING.value,),
|
||||
}
|
||||
|
||||
|
||||
def state_aliases_for(state: "PetState | str") -> tuple[str, ...]:
|
||||
"""Return accepted row-name aliases for *state* (always non-empty)."""
|
||||
value = state.value if isinstance(state, PetState) else str(state)
|
||||
aliases = STATE_ALIASES.get(value)
|
||||
return aliases if aliases else (value,)
|
||||
|
||||
|
||||
def state_rows_for_grid(row_count: int | None) -> list[str]:
|
||||
"""Return the row taxonomy for a spritesheet with *row_count* rows."""
|
||||
try:
|
||||
rows = int(row_count or 0)
|
||||
except (TypeError, ValueError):
|
||||
rows = 0
|
||||
|
||||
if rows >= len(CODEX_STATE_ROWS):
|
||||
return CODEX_STATE_ROWS
|
||||
return LEGACY_STATE_ROWS
|
||||
|
||||
|
||||
def state_row_index(state: "PetState | str", row_count: int | None = None) -> int:
|
||||
"""Return the spritesheet row index for *state* (clamped, never raises)."""
|
||||
rows = state_rows_for_grid(row_count)
|
||||
for name in state_aliases_for(state):
|
||||
try:
|
||||
return rows.index(name)
|
||||
except ValueError:
|
||||
continue
|
||||
return 0 # fall back to the idle row
|
||||
128
agent/pet/manifest.py
Normal file
128
agent/pet/manifest.py
Normal file
|
|
@ -0,0 +1,128 @@
|
|||
"""Fetch the public petdex manifest.
|
||||
|
||||
``https://petdex.dev/api/manifest`` 307-redirects to a JSON document on R2:
|
||||
|
||||
{
|
||||
"generatedAt": "...",
|
||||
"total": 2926,
|
||||
"pets": [
|
||||
{"slug": "boba", "displayName": "Boba", "kind": "creature",
|
||||
"submittedBy": "railly",
|
||||
"spritesheetUrl": "https://assets.petdex.dev/.../spritesheet.webp",
|
||||
"petJsonUrl": "https://assets.petdex.dev/.../pet.json",
|
||||
"zipUrl": "https://assets.petdex.dev/.../boba.zip"},
|
||||
...
|
||||
]
|
||||
}
|
||||
|
||||
Read-only and unauthenticated; no credentials involved.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MANIFEST_URL = "https://petdex.dev/api/manifest"
|
||||
|
||||
_DEFAULT_TIMEOUT = 20.0
|
||||
|
||||
# In-process cache for the (large, slow, identical-per-call) manifest. The list
|
||||
# is a static CDN object that barely changes, yet a single session can ask for
|
||||
# it many times — every gallery open, plus a full re-fetch per install/select
|
||||
# (``find_entry``). A short TTL collapses those into one network hit without
|
||||
# going stale for long. Cleared by :func:`clear_cache` (tests).
|
||||
_MANIFEST_TTL = 300.0
|
||||
_cache: tuple[float, list[ManifestEntry]] | None = None
|
||||
|
||||
|
||||
def clear_cache() -> None:
|
||||
"""Drop the cached manifest (forces the next fetch to hit the network)."""
|
||||
global _cache
|
||||
_cache = None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ManifestEntry:
|
||||
"""A single pet's row in the manifest."""
|
||||
|
||||
slug: str
|
||||
display_name: str
|
||||
kind: str
|
||||
submitted_by: str
|
||||
spritesheet_url: str
|
||||
pet_json_url: str
|
||||
zip_url: str
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict) -> "ManifestEntry":
|
||||
return cls(
|
||||
slug=str(data.get("slug", "")).strip(),
|
||||
display_name=str(data.get("displayName", "") or data.get("slug", "")),
|
||||
kind=str(data.get("kind", "") or "pet"),
|
||||
submitted_by=str(data.get("submittedBy", "") or ""),
|
||||
spritesheet_url=str(data.get("spritesheetUrl", "") or ""),
|
||||
pet_json_url=str(data.get("petJsonUrl", "") or ""),
|
||||
zip_url=str(data.get("zipUrl", "") or ""),
|
||||
)
|
||||
|
||||
|
||||
class ManifestError(RuntimeError):
|
||||
"""Raised when the manifest can't be fetched or parsed."""
|
||||
|
||||
|
||||
def fetch_manifest(*, timeout: float = _DEFAULT_TIMEOUT, force: bool = False) -> list[ManifestEntry]:
|
||||
"""Return every approved pet from the public manifest.
|
||||
|
||||
Cached in-process for ``_MANIFEST_TTL`` seconds (pass ``force=True`` to
|
||||
bypass). Follows the 307 redirect to R2. Raises :class:`ManifestError` on
|
||||
any network/parse failure so callers can surface a clean message.
|
||||
"""
|
||||
global _cache
|
||||
|
||||
if not force and _cache is not None and time.monotonic() - _cache[0] < _MANIFEST_TTL:
|
||||
return _cache[1]
|
||||
|
||||
try:
|
||||
import httpx
|
||||
except ImportError as exc: # pragma: no cover - httpx is a core dep
|
||||
raise ManifestError("httpx is required to fetch the petdex manifest") from exc
|
||||
|
||||
try:
|
||||
resp = httpx.get(
|
||||
MANIFEST_URL,
|
||||
timeout=timeout,
|
||||
follow_redirects=True,
|
||||
headers={"User-Agent": "hermes-agent-petdex"},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
payload = resp.json()
|
||||
except Exception as exc: # noqa: BLE001 - normalize to one error type
|
||||
raise ManifestError(f"could not fetch petdex manifest: {exc}") from exc
|
||||
|
||||
pets = payload.get("pets") if isinstance(payload, dict) else None
|
||||
if not isinstance(pets, list):
|
||||
raise ManifestError("petdex manifest had no 'pets' array")
|
||||
|
||||
entries: list[ManifestEntry] = []
|
||||
for raw in pets:
|
||||
if not isinstance(raw, dict):
|
||||
continue
|
||||
entry = ManifestEntry.from_dict(raw)
|
||||
if entry.slug and entry.spritesheet_url:
|
||||
entries.append(entry)
|
||||
|
||||
_cache = (time.monotonic(), entries)
|
||||
return entries
|
||||
|
||||
|
||||
def find_entry(slug: str, *, timeout: float = _DEFAULT_TIMEOUT) -> ManifestEntry | None:
|
||||
"""Return the manifest entry for *slug*, or ``None`` if not listed."""
|
||||
slug = slug.strip().lower()
|
||||
for entry in fetch_manifest(timeout=timeout):
|
||||
if entry.slug.lower() == slug:
|
||||
return entry
|
||||
return None
|
||||
618
agent/pet/render.py
Normal file
618
agent/pet/render.py
Normal file
|
|
@ -0,0 +1,618 @@
|
|||
"""Decode a pet spritesheet and encode frames for a terminal.
|
||||
|
||||
Shared by the base CLI (writes the escape bytes to its own stdout) and the
|
||||
TUI (``tui_gateway`` ships the encoded bytes to Ink, which writes them) so the
|
||||
decode + capability-detection + protocol-encoding logic exists exactly once.
|
||||
|
||||
Supported output modes, in fidelity order:
|
||||
|
||||
- ``kitty`` — the kitty graphics protocol (kitty, Ghostty, WezTerm).
|
||||
- ``iterm`` — iTerm2 inline images (iTerm2, WezTerm).
|
||||
- ``sixel`` — DEC sixel (xterm -ti vt340, foot, mlterm, WezTerm, …).
|
||||
- ``unicode`` — 24-bit half-block downscale; works in any truecolor terminal.
|
||||
|
||||
Frame decoding requires Pillow (a core Hermes dependency). If Pillow or the
|
||||
spritesheet is unavailable the renderer degrades to ``unicode`` text or an
|
||||
empty string rather than raising.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from functools import lru_cache
|
||||
from pathlib import Path
|
||||
|
||||
from agent.pet.constants import (
|
||||
DEFAULT_SCALE,
|
||||
FRAME_H,
|
||||
FRAME_W,
|
||||
FRAMES_PER_STATE,
|
||||
PetState,
|
||||
state_row_index,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Public render-mode names accepted by ``display.pet.render_mode``.
|
||||
RENDER_MODES = ("auto", "kitty", "iterm", "sixel", "unicode", "off")
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
# Terminal capability detection
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def detect_terminal_graphics() -> str:
|
||||
"""Best-effort detection of the richest graphics protocol available.
|
||||
|
||||
Env-based (non-blocking — we never issue a DA1/terminal query that could
|
||||
hang a pipe). Returns one of ``kitty`` / ``iterm`` / ``sixel`` /
|
||||
``unicode``. Conservative: unknown terminals get ``unicode``, which works
|
||||
anywhere with truecolor.
|
||||
"""
|
||||
term = os.environ.get("TERM", "").lower()
|
||||
term_program = os.environ.get("TERM_PROGRAM", "").lower()
|
||||
|
||||
# The VS Code / Cursor integrated terminal sets TERM_PROGRAM=vscode
|
||||
# authoritatively but does NOT scrub the terminal env vars it inherits when
|
||||
# launched from another emulator (ITERM_SESSION_ID, KITTY_WINDOW_ID, …).
|
||||
# Trusting those leaks emits an image protocol the embedded xterm.js can't
|
||||
# display — you get a blank frame. Inline images there are opt-in
|
||||
# (terminal.integrated.enableImages), so default to half-blocks, which
|
||||
# always render in its truecolor grid. Users who enabled images can pin
|
||||
# display.pet.render_mode explicitly.
|
||||
if term_program == "vscode":
|
||||
return "unicode"
|
||||
|
||||
# kitty graphics protocol
|
||||
if os.environ.get("KITTY_WINDOW_ID") or "kitty" in term or "ghostty" in term:
|
||||
return "kitty"
|
||||
if term_program in {"ghostty"}:
|
||||
return "kitty"
|
||||
|
||||
# WezTerm speaks both kitty and iterm; prefer kitty (richer placement).
|
||||
if term_program == "wezterm" or os.environ.get("WEZTERM_PANE"):
|
||||
return "kitty"
|
||||
|
||||
# iTerm2 inline images
|
||||
if term_program == "iterm.app" or os.environ.get("ITERM_SESSION_ID"):
|
||||
return "iterm"
|
||||
|
||||
# sixel-capable terminals (env heuristics only)
|
||||
if term_program in {"mintty"} or "foot" in term or "mlterm" in term:
|
||||
return "sixel"
|
||||
if "sixel" in term:
|
||||
return "sixel"
|
||||
|
||||
return "unicode"
|
||||
|
||||
|
||||
def resolve_mode(configured: str | None, *, stream=None) -> str:
|
||||
"""Resolve the effective render mode from config + the environment.
|
||||
|
||||
``configured`` is ``display.pet.render_mode`` (``auto`` → detect). Returns
|
||||
``off`` when not attached to a TTY (no point emitting graphics into a pipe
|
||||
or logfile).
|
||||
"""
|
||||
mode = (configured or "auto").strip().lower()
|
||||
if mode not in RENDER_MODES:
|
||||
mode = "auto"
|
||||
if mode == "off":
|
||||
return "off"
|
||||
|
||||
stream = stream or sys.stdout
|
||||
try:
|
||||
if not (hasattr(stream, "isatty") and stream.isatty()):
|
||||
return "off"
|
||||
except (ValueError, OSError):
|
||||
return "off"
|
||||
|
||||
if mode == "auto":
|
||||
return detect_terminal_graphics()
|
||||
return mode
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
# Frame decoding
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def _open_sheet(path: Path):
|
||||
from PIL import Image
|
||||
|
||||
img = Image.open(path)
|
||||
return img.convert("RGBA")
|
||||
|
||||
|
||||
# Max alpha at/below which a frame counts as blank padding. petdex sheets are
|
||||
# left-packed: a state with fewer real frames than ``FRAMES_PER_STATE`` fills
|
||||
# the trailing columns with fully transparent cells. Animating into one flashes
|
||||
# the pet blank, so we stop the row at the first such gap.
|
||||
_BLANK_ALPHA = 8
|
||||
|
||||
|
||||
def _frame_is_blank(frame) -> bool:
|
||||
"""True if *frame* has no meaningfully opaque pixel (transparent padding)."""
|
||||
return frame.getchannel("A").getextrema()[1] <= _BLANK_ALPHA
|
||||
|
||||
|
||||
@lru_cache(maxsize=16)
|
||||
def _raw_frames(
|
||||
sheet_path: str,
|
||||
state_value: str,
|
||||
frame_w: int,
|
||||
frame_h: int,
|
||||
frames_per_state: int,
|
||||
) -> tuple:
|
||||
"""Cropped, padding-trimmed RGBA frames for one state row (unscaled).
|
||||
|
||||
Steps across the row until the first blank column so pets with ragged
|
||||
per-state frame counts never animate into empty padding. Cached; returns
|
||||
``()`` on any decode failure.
|
||||
"""
|
||||
try:
|
||||
sheet = _open_sheet(Path(sheet_path))
|
||||
cols = max(1, sheet.width // frame_w)
|
||||
rows = max(1, sheet.height // frame_h)
|
||||
row = state_row_index(state_value, rows)
|
||||
top = row * frame_h
|
||||
# Clamp the row to the sheet (some pets ship fewer rows than the 8 the
|
||||
# taxonomy reserves).
|
||||
if top + frame_h > sheet.height:
|
||||
top = max(0, sheet.height - frame_h)
|
||||
|
||||
frames = []
|
||||
for i in range(min(frames_per_state, cols)):
|
||||
left = i * frame_w
|
||||
frame = sheet.crop((left, top, left + frame_w, top + frame_h))
|
||||
if _frame_is_blank(frame):
|
||||
break # trailing transparent padding — real frames end here
|
||||
frames.append(frame)
|
||||
return tuple(frames)
|
||||
except Exception as exc: # noqa: BLE001 - cosmetic feature, never fatal
|
||||
logger.debug("pet frame decode failed (%s, %s): %s", sheet_path, state_value, exc)
|
||||
return ()
|
||||
|
||||
|
||||
@lru_cache(maxsize=8)
|
||||
def _frames_for(
|
||||
sheet_path: str,
|
||||
state_value: str,
|
||||
frame_w: int,
|
||||
frame_h: int,
|
||||
frames_per_state: int,
|
||||
scale_w: int,
|
||||
scale_h: int,
|
||||
):
|
||||
"""Return padding-trimmed RGBA frames for one state row, scaled.
|
||||
|
||||
Thin scaling layer over :func:`_raw_frames`; both are cached so repeated
|
||||
frame requests during animation are free.
|
||||
"""
|
||||
raw = _raw_frames(sheet_path, state_value, frame_w, frame_h, frames_per_state)
|
||||
if not raw or (scale_w, scale_h) == (frame_w, frame_h):
|
||||
return list(raw)
|
||||
from PIL import Image
|
||||
|
||||
return [f.resize((scale_w, scale_h), Image.LANCZOS) for f in raw]
|
||||
|
||||
|
||||
def state_frame_counts(
|
||||
sheet_path: str | Path,
|
||||
*,
|
||||
frame_w: int = FRAME_W,
|
||||
frame_h: int = FRAME_H,
|
||||
frames_per_state: int = FRAMES_PER_STATE,
|
||||
) -> dict[str, int]:
|
||||
"""Map each driven :class:`PetState` → its real (padding-trimmed) frame count.
|
||||
|
||||
The single source of truth for "how many frames does this state actually
|
||||
have?". The CLI/TUI consume the trimmed frame lists directly; the gateway
|
||||
ships this map to the desktop canvas, which steps its own loop.
|
||||
"""
|
||||
return {
|
||||
state.value: len(
|
||||
_raw_frames(str(sheet_path), state.value, frame_w, frame_h, frames_per_state)
|
||||
)
|
||||
for state in PetState
|
||||
}
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
# Encoders
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def _png_bytes(frame) -> bytes:
|
||||
buf = io.BytesIO()
|
||||
frame.save(buf, format="PNG")
|
||||
return buf.getvalue()
|
||||
|
||||
|
||||
def _kitty_apc(ctrl: str, data: str) -> str:
|
||||
"""Emit a kitty APC escape for *data*, chunked into ≤4096-byte ``m`` pieces."""
|
||||
chunk = 4096
|
||||
if len(data) <= chunk:
|
||||
return f"\x1b_G{ctrl},m=0;{data}\x1b\\"
|
||||
out = [f"\x1b_G{ctrl},m=1;{data[:chunk]}\x1b\\"]
|
||||
rest = data[chunk:]
|
||||
while rest:
|
||||
piece, rest = rest[:chunk], rest[chunk:]
|
||||
out.append(f"\x1b_Gm={1 if rest else 0};{piece}\x1b\\")
|
||||
return "".join(out)
|
||||
|
||||
|
||||
def _encode_kitty(frame, *, cell_cols: int | None = None, cell_rows: int | None = None) -> str:
|
||||
"""Encode one frame via the kitty graphics protocol (transmit + display).
|
||||
|
||||
``a=T`` transmits & displays at the cursor; ``c``/``r`` request a display
|
||||
box in terminal cells so successive frames overwrite the same area.
|
||||
"""
|
||||
ctrl = "f=100,a=T,q=2"
|
||||
if cell_cols:
|
||||
ctrl += f",c={cell_cols}"
|
||||
if cell_rows:
|
||||
ctrl += f",r={cell_rows}"
|
||||
return _kitty_apc(ctrl, base64.standard_b64encode(_png_bytes(frame)).decode("ascii"))
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
# kitty Unicode placeholders
|
||||
#
|
||||
# Ink (the TUI's React-for-terminal layer) owns the screen and measures every
|
||||
# cell's width, so it can't host raw kitty image escapes (no width to count,
|
||||
# clobbered on the next repaint). kitty's *Unicode placeholder* protocol is the
|
||||
# grid-safe path: transmit the image once (q=2, virtual placement U=1), then the
|
||||
# host app prints ordinary-width placeholder cells (U+10EEEE + diacritics) whose
|
||||
# foreground color encodes the image id. Ink counts those as width-1 text, so
|
||||
# layout stays correct and the terminal paints the image underneath.
|
||||
# https://sw.kovidgoyal.net/kitty/graphics-protocol/#unicode-placeholders
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
|
||||
_KITTY_PLACEHOLDER = "\U0010eeee"
|
||||
|
||||
# Row/column diacritics, in order (index → diacritic). Verbatim from kitty's
|
||||
# gen/rowcolumn-diacritics.txt (Unicode 6.0.0, combining class 230). Index i is
|
||||
# the diacritic that encodes the number i; we only ever need the row index.
|
||||
_ROWCOL_DIACRITICS: tuple[int, ...] = (
|
||||
0x0305, 0x030D, 0x030E, 0x0310, 0x0312, 0x033D, 0x033E, 0x033F, 0x0346, 0x034A,
|
||||
0x034B, 0x034C, 0x0350, 0x0351, 0x0352, 0x0357, 0x035B, 0x0363, 0x0364, 0x0365,
|
||||
0x0366, 0x0367, 0x0368, 0x0369, 0x036A, 0x036B, 0x036C, 0x036D, 0x036E, 0x036F,
|
||||
0x0483, 0x0484, 0x0485, 0x0486, 0x0487, 0x0592, 0x0593, 0x0594, 0x0595, 0x0597,
|
||||
0x0598, 0x0599, 0x059C, 0x059D, 0x059E, 0x059F, 0x05A0, 0x05A1, 0x05A8, 0x05A9,
|
||||
0x05AB, 0x05AC, 0x05AF, 0x05C4, 0x0610, 0x0611, 0x0612, 0x0613, 0x0614, 0x0615,
|
||||
0x0616, 0x0617, 0x0657, 0x0658, 0x0659, 0x065A, 0x065B, 0x065D, 0x065E, 0x06D6,
|
||||
0x06D7, 0x06D8, 0x06D9, 0x06DA, 0x06DB, 0x06DC, 0x06DF, 0x06E0, 0x06E1, 0x06E2,
|
||||
0x06E4, 0x06E7, 0x06E8, 0x06EB, 0x06EC, 0x0730, 0x0732, 0x0733, 0x0735, 0x0736,
|
||||
0x073A, 0x073D, 0x073F, 0x0740, 0x0741, 0x0743, 0x0745, 0x0747, 0x0749, 0x074A,
|
||||
0x07EB, 0x07EC, 0x07ED, 0x07EE, 0x07EF, 0x07F0, 0x07F1, 0x07F3, 0x0816, 0x0817,
|
||||
0x0818, 0x0819, 0x081B, 0x081C, 0x081D, 0x081E, 0x081F, 0x0820, 0x0821, 0x0822,
|
||||
0x0823, 0x0825, 0x0826, 0x0827, 0x0829, 0x082A, 0x082B, 0x082C, 0x082D, 0x0951,
|
||||
0x0953, 0x0954, 0x0F82, 0x0F83, 0x0F86, 0x0F87, 0x135D, 0x135E, 0x135F, 0x17DD,
|
||||
0x193A, 0x1A17, 0x1A75, 0x1A76, 0x1A77, 0x1A78, 0x1A79, 0x1A7A, 0x1A7B, 0x1A7C,
|
||||
0x1B6B, 0x1B6D, 0x1B6E, 0x1B6F, 0x1B70, 0x1B71, 0x1B72, 0x1B73, 0x1CD0, 0x1CD1,
|
||||
0x1CD2, 0x1CDA, 0x1CDB, 0x1CE0, 0x1DC0, 0x1DC1, 0x1DC3, 0x1DC4, 0x1DC5, 0x1DC6,
|
||||
0x1DC7, 0x1DC8, 0x1DC9, 0x1DCB, 0x1DCC, 0x1DD1, 0x1DD2, 0x1DD3, 0x1DD4, 0x1DD5,
|
||||
0x1DD6, 0x1DD7, 0x1DD8, 0x1DD9, 0x1DDA, 0x1DDB, 0x1DDC, 0x1DDD, 0x1DDE, 0x1DDF,
|
||||
0x1DE0, 0x1DE1, 0x1DE2, 0x1DE3, 0x1DE4, 0x1DE5, 0x1DE6, 0x1DFE, 0x20D0, 0x20D1,
|
||||
0x20D4, 0x20D5, 0x20D6, 0x20D7, 0x20DB, 0x20DC, 0x20E1, 0x20E7, 0x20E9, 0x20F0,
|
||||
0x2CEF, 0x2CF0, 0x2CF1, 0x2DE0, 0x2DE1, 0x2DE2, 0x2DE3, 0x2DE4, 0x2DE5, 0x2DE6,
|
||||
0x2DE7, 0x2DE8, 0x2DE9, 0x2DEA, 0x2DEB, 0x2DEC, 0x2DED, 0x2DEE, 0x2DEF, 0x2DF0,
|
||||
0x2DF1, 0x2DF2, 0x2DF3, 0x2DF4, 0x2DF5, 0x2DF6, 0x2DF7, 0x2DF8, 0x2DF9, 0x2DFA,
|
||||
0x2DFB, 0x2DFC, 0x2DFD, 0x2DFE, 0x2DFF, 0xA66F, 0xA67C, 0xA67D, 0xA6F0, 0xA6F1,
|
||||
0xA8E0, 0xA8E1, 0xA8E2, 0xA8E3, 0xA8E4, 0xA8E5, 0xA8E6, 0xA8E7, 0xA8E8, 0xA8E9,
|
||||
0xA8EA, 0xA8EB, 0xA8EC, 0xA8ED, 0xA8EE, 0xA8EF, 0xA8F0, 0xA8F1, 0xAAB0, 0xAAB2,
|
||||
0xAAB3, 0xAAB7, 0xAAB8, 0xAABE, 0xAABF, 0xAAC1, 0xFE20, 0xFE21, 0xFE22, 0xFE23,
|
||||
0xFE24, 0xFE25, 0xFE26, 0x10A0F, 0x10A38, 0x1D185, 0x1D186, 0x1D187, 0x1D188,
|
||||
0x1D189, 0x1D1AA, 0x1D1AB, 0x1D1AC, 0x1D1AD, 0x1D242, 0x1D243, 0x1D244,
|
||||
)
|
||||
|
||||
|
||||
def kitty_image_id(slug: str) -> int:
|
||||
"""Stable per-pet image id in ``[1, 0x7FFF]``.
|
||||
|
||||
The id is encoded in the placeholder's 24-bit foreground color, so it must
|
||||
be non-zero and fit comfortably under ``0xFFFFFF``. A small CRC keeps it
|
||||
deterministic per slug (so re-renders reuse the same terminal-side image)
|
||||
while making collisions between two different pets unlikely.
|
||||
"""
|
||||
import zlib
|
||||
|
||||
return (zlib.crc32(slug.encode("utf-8")) % 0x7FFE) + 1
|
||||
|
||||
|
||||
def kitty_color_hex(image_id: int) -> str:
|
||||
"""Hex foreground color (``#rrggbb``) that encodes *image_id* for kitty."""
|
||||
return "#%06x" % (image_id & 0xFFFFFF)
|
||||
|
||||
|
||||
def kitty_placeholder_rows(cols: int, rows: int) -> list[str]:
|
||||
"""Build the placeholder text grid for an *rows*×*cols* image.
|
||||
|
||||
Each line is one row of the grid: the first cell carries the row diacritic
|
||||
(column defaults to 0), and the remaining ``cols-1`` bare placeholders let
|
||||
the terminal auto-increment the column. The foreground color (the image id)
|
||||
is applied by the caller / Ink, not embedded here.
|
||||
"""
|
||||
cols = max(1, cols)
|
||||
out: list[str] = []
|
||||
for r in range(max(1, rows)):
|
||||
idx = min(r, len(_ROWCOL_DIACRITICS) - 1)
|
||||
first = _KITTY_PLACEHOLDER + chr(_ROWCOL_DIACRITICS[idx])
|
||||
out.append(first + _KITTY_PLACEHOLDER * (cols - 1))
|
||||
return out
|
||||
|
||||
|
||||
def _encode_kitty_virtual(frame, *, image_id: int, cols: int, rows: int) -> str:
|
||||
"""Transmit a frame as a kitty *virtual* placement for Unicode placeholders.
|
||||
|
||||
``a=T`` transmits and creates the placement in one shot; ``U=1`` marks it
|
||||
virtual (no on-screen output, cursor untouched); ``q=2`` suppresses the
|
||||
terminal's OK/error replies that would otherwise corrupt the host app's
|
||||
output. Re-sending with the same ``i`` replaces the image, so the static
|
||||
placeholder cells animate underneath.
|
||||
"""
|
||||
ctrl = f"a=T,U=1,i={image_id},c={cols},r={rows},f=100,q=2"
|
||||
return _kitty_apc(ctrl, base64.standard_b64encode(_png_bytes(frame)).decode("ascii"))
|
||||
|
||||
|
||||
def _encode_iterm(frame, *, cell_cols: int | None = None, cell_rows: int | None = None) -> str:
|
||||
"""Encode one frame as an iTerm2 inline image (OSC 1337 File)."""
|
||||
payload = base64.standard_b64encode(_png_bytes(frame)).decode("ascii")
|
||||
size = len(payload)
|
||||
args = [f"inline=1", f"size={size}", "preserveAspectRatio=1"]
|
||||
if cell_cols:
|
||||
args.append(f"width={cell_cols}")
|
||||
if cell_rows:
|
||||
args.append(f"height={cell_rows}")
|
||||
return f"\x1b]1337;File={';'.join(args)}:{payload}\x07"
|
||||
|
||||
|
||||
def _encode_sixel(frame) -> str:
|
||||
"""Encode one frame as DEC sixel.
|
||||
|
||||
Quantizes to an adaptive palette (≤255 colors) and emits the sixel band
|
||||
stream. Pillow has no sixel writer, so this is a compact hand-rolled
|
||||
encoder. Transparent pixels render as background (color register skipped).
|
||||
"""
|
||||
from PIL import Image
|
||||
|
||||
rgba = frame
|
||||
# Composite onto transparent-as-skip: track alpha to decide background.
|
||||
pal = rgba.convert("RGB").quantize(colors=255, method=Image.MEDIANCUT)
|
||||
palette = pal.getpalette() or []
|
||||
px = pal.load()
|
||||
alpha = rgba.getchannel("A").load()
|
||||
w, h = pal.size
|
||||
|
||||
out = ["\x1bP0;1;0q", '"1;1;%d;%d' % (w, h)]
|
||||
# Color register definitions (sixel uses 0..100 scale).
|
||||
used = sorted({px[x, y] for y in range(h) for x in range(w)})
|
||||
for idx in used:
|
||||
r = palette[idx * 3] if idx * 3 < len(palette) else 0
|
||||
g = palette[idx * 3 + 1] if idx * 3 + 1 < len(palette) else 0
|
||||
b = palette[idx * 3 + 2] if idx * 3 + 2 < len(palette) else 0
|
||||
out.append("#%d;2;%d;%d;%d" % (idx, r * 100 // 255, g * 100 // 255, b * 100 // 255))
|
||||
|
||||
# Emit in 6-row bands.
|
||||
for band in range(0, h, 6):
|
||||
for color_idx in used:
|
||||
line = ["#%d" % color_idx]
|
||||
run_char = None
|
||||
run_len = 0
|
||||
|
||||
def flush():
|
||||
nonlocal run_char, run_len
|
||||
if run_char is None:
|
||||
return
|
||||
if run_len > 3:
|
||||
line.append("!%d%s" % (run_len, run_char))
|
||||
else:
|
||||
line.append(run_char * run_len)
|
||||
run_char, run_len = None, 0
|
||||
|
||||
for x in range(w):
|
||||
bits = 0
|
||||
for bit in range(6):
|
||||
y = band + bit
|
||||
if y < h and alpha[x, y] > 32 and px[x, y] == color_idx:
|
||||
bits |= 1 << bit
|
||||
ch = chr(63 + bits)
|
||||
if ch == run_char:
|
||||
run_len += 1
|
||||
else:
|
||||
flush()
|
||||
run_char, run_len = ch, 1
|
||||
flush()
|
||||
out.append("".join(line) + "$") # carriage return within band
|
||||
out.append("-") # next band
|
||||
out.append("\x1b\\")
|
||||
return "".join(out)
|
||||
|
||||
|
||||
_HALF_BLOCK = "▀"
|
||||
|
||||
# A single half-block cell: top pixel + bottom pixel as (r, g, b, a) tuples.
|
||||
Cell = tuple[tuple[int, int, int, int], tuple[int, int, int, int]]
|
||||
|
||||
|
||||
def _downscale_cells(frame, *, target_cols: int) -> list[list[Cell]]:
|
||||
"""Downscale a frame to a grid of half-block cells.
|
||||
|
||||
Each cell pairs a top and bottom pixel so one terminal row encodes two
|
||||
pixel rows. Returns rows of ``((tr,tg,tb,ta),(br,bg,bb,ba))`` — the
|
||||
framework-neutral representation shared by the ANSI encoder (CLI) and the
|
||||
structured ``cells`` API (Ink).
|
||||
"""
|
||||
from PIL import Image
|
||||
|
||||
target_cols = max(4, target_cols)
|
||||
aspect = frame.height / max(1, frame.width)
|
||||
target_rows = max(2, int(round(target_cols * aspect * 0.5)) * 2)
|
||||
small = frame.resize((target_cols, target_rows), Image.LANCZOS).convert("RGBA")
|
||||
px = small.load()
|
||||
|
||||
grid: list[list[Cell]] = []
|
||||
for y in range(0, target_rows, 2):
|
||||
row: list[Cell] = []
|
||||
for x in range(target_cols):
|
||||
top = px[x, y]
|
||||
bottom = px[x, y + 1] if y + 1 < target_rows else (0, 0, 0, 0)
|
||||
row.append((top, bottom))
|
||||
grid.append(row)
|
||||
return grid
|
||||
|
||||
|
||||
def _encode_unicode(frame, *, target_cols: int) -> str:
|
||||
"""Downscale to truecolor ANSI half-blocks (one char = 2 vertical pixels)."""
|
||||
lines: list[str] = []
|
||||
for row in _downscale_cells(frame, target_cols=target_cols):
|
||||
cells: list[str] = []
|
||||
for (tr, tg, tb, ta), (br, bg, bb, ba) in row:
|
||||
if ta < 32 and ba < 32:
|
||||
cells.append("\x1b[0m ") # fully transparent → blank
|
||||
continue
|
||||
cells.append(f"\x1b[38;2;{tr};{tg};{tb}m\x1b[48;2;{br};{bg};{bb}m{_HALF_BLOCK}")
|
||||
lines.append("".join(cells) + "\x1b[0m")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
# Public renderer
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class PetRenderer:
|
||||
"""Holds a pet's spritesheet and yields encoded frames per (state, index).
|
||||
|
||||
Construct once per pet, then call :meth:`frame` on an animation timer.
|
||||
Cheap to call repeatedly — decoded frames are cached.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
spritesheet: str | Path,
|
||||
*,
|
||||
mode: str = "unicode",
|
||||
scale: float = DEFAULT_SCALE,
|
||||
unicode_cols: int = 20,
|
||||
frame_w: int = FRAME_W,
|
||||
frame_h: int = FRAME_H,
|
||||
frames_per_state: int = FRAMES_PER_STATE,
|
||||
) -> None:
|
||||
self.spritesheet = str(spritesheet)
|
||||
self.mode = mode if mode in RENDER_MODES else "unicode"
|
||||
self.scale = scale
|
||||
self.unicode_cols = unicode_cols
|
||||
self.frame_w = frame_w
|
||||
self.frame_h = frame_h
|
||||
self.frames_per_state = frames_per_state
|
||||
|
||||
@property
|
||||
def available(self) -> bool:
|
||||
return self.mode != "off" and Path(self.spritesheet).is_file()
|
||||
|
||||
def frame_count(self, state: PetState | str) -> int:
|
||||
return len(self._frames(state))
|
||||
|
||||
def _frames(self, state: PetState | str):
|
||||
value = state.value if isinstance(state, PetState) else str(state)
|
||||
scale_w = max(1, int(self.frame_w * self.scale))
|
||||
scale_h = max(1, int(self.frame_h * self.scale))
|
||||
return _frames_for(
|
||||
self.spritesheet,
|
||||
value,
|
||||
self.frame_w,
|
||||
self.frame_h,
|
||||
self.frames_per_state,
|
||||
scale_w,
|
||||
scale_h,
|
||||
)
|
||||
|
||||
def cells(self, state: PetState | str, index: int, *, cols: int | None = None) -> list[list[Cell]]:
|
||||
"""Return one frame as a half-block cell grid (framework-neutral).
|
||||
|
||||
Used by the TUI, which renders the grid with native Ink color props
|
||||
instead of raw ANSI. Returns ``[]`` when no frame is available.
|
||||
"""
|
||||
frames = self._frames(state)
|
||||
if not frames:
|
||||
return []
|
||||
frame = frames[index % len(frames)]
|
||||
return _downscale_cells(frame, target_cols=cols or self.unicode_cols)
|
||||
|
||||
def _cell_box(self, frame) -> tuple[int, int]:
|
||||
"""Terminal cell box for a scaled frame (~8×16 px per cell).
|
||||
|
||||
Must match :meth:`frame` graphics sizing — kitty stretches the image to
|
||||
fill ``c``×``r`` cells, so these must reflect the scaled pixel
|
||||
dimensions, not a native-aspect column count (that upscales small pets).
|
||||
"""
|
||||
return max(1, frame.width // 8), max(1, frame.height // 16)
|
||||
|
||||
def kitty_payload(self, state: PetState | str, *, image_id: int) -> dict | None:
|
||||
"""Build the kitty Unicode-placeholder payload for one state.
|
||||
|
||||
Returns ``{cols, rows, placeholder, frames}`` where ``frames`` is a
|
||||
list of transmit escapes (one per animation frame, all reusing
|
||||
``image_id``) and ``placeholder`` is the static text grid Ink paints.
|
||||
Placement geometry is derived from the scaled frame pixels (via
|
||||
:meth:`_cell_box`), not ``unicode_cols`` — kitty upscales to fill
|
||||
``c``×``r`` cells. ``None`` when no frame is available.
|
||||
"""
|
||||
frames = self._frames(state)
|
||||
if not frames:
|
||||
return None
|
||||
cols, rows = self._cell_box(frames[0])
|
||||
return {
|
||||
"cols": cols,
|
||||
"rows": rows,
|
||||
"placeholder": kitty_placeholder_rows(cols, rows),
|
||||
"frames": [
|
||||
_encode_kitty_virtual(f, image_id=image_id, cols=cols, rows=rows) for f in frames
|
||||
],
|
||||
}
|
||||
|
||||
def frame(self, state: PetState | str, index: int) -> str:
|
||||
"""Return the encoded escape string for one frame, or ``""``.
|
||||
|
||||
``index`` is taken modulo the available frame count so callers can pass
|
||||
a free-running counter.
|
||||
"""
|
||||
if self.mode == "off":
|
||||
return ""
|
||||
frames = self._frames(state)
|
||||
if not frames:
|
||||
return ""
|
||||
frame = frames[index % len(frames)]
|
||||
cell_cols, cell_rows = self._cell_box(frame)
|
||||
|
||||
try:
|
||||
if self.mode == "kitty":
|
||||
return _encode_kitty(frame, cell_cols=cell_cols, cell_rows=cell_rows)
|
||||
if self.mode == "iterm":
|
||||
return _encode_iterm(frame, cell_cols=cell_cols, cell_rows=cell_rows)
|
||||
if self.mode == "sixel":
|
||||
return _encode_sixel(frame)
|
||||
return _encode_unicode(frame, target_cols=self.unicode_cols)
|
||||
except Exception as exc: # noqa: BLE001 - degrade silently
|
||||
logger.debug("pet frame encode failed (mode=%s): %s", self.mode, exc)
|
||||
return ""
|
||||
|
||||
|
||||
def build_renderer(
|
||||
spritesheet: str | Path,
|
||||
*,
|
||||
configured_mode: str | None = None,
|
||||
scale: float = DEFAULT_SCALE,
|
||||
unicode_cols: int = 20,
|
||||
stream=None,
|
||||
) -> PetRenderer:
|
||||
"""Convenience factory: resolve the mode from config+env, then construct."""
|
||||
mode = resolve_mode(configured_mode, stream=stream)
|
||||
return PetRenderer(
|
||||
spritesheet,
|
||||
mode=mode,
|
||||
scale=scale,
|
||||
unicode_cols=unicode_cols,
|
||||
)
|
||||
81
agent/pet/state.py
Normal file
81
agent/pet/state.py
Normal file
|
|
@ -0,0 +1,81 @@
|
|||
"""Map agent activity → a :class:`PetState`.
|
||||
|
||||
This is the one place the "what is the agent doing right now?" → "which
|
||||
animation row?" decision lives. Each surface feeds it the signals it already
|
||||
tracks:
|
||||
|
||||
- CLI — ``KawaiiSpinner`` waiting/thinking state + tool outcomes.
|
||||
- TUI — gateway ``tool.start/complete`` + ``message.delta/complete`` events.
|
||||
- Desktop — the ``$busy``/``$awaitingResponse``/tool-event nanostores
|
||||
(re-implemented in TS, but mirroring this priority order).
|
||||
|
||||
Keeping the priority order here (and documenting it) lets the TypeScript
|
||||
mirror stay faithful without a second design.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Iterable
|
||||
from typing import Any
|
||||
|
||||
from agent.pet.constants import PetState
|
||||
|
||||
|
||||
def todos_all_done(todos: Iterable[Any] | None) -> bool:
|
||||
"""True iff there's ≥1 todo and every one is completed/cancelled.
|
||||
|
||||
The "celebrate" beat (``JUMP``) fires when a plan finishes; this mirrors
|
||||
the TUI's ``isTodoDone`` so the trigger is defined once across surfaces.
|
||||
Accepts dicts (``{"status": ...}``) or objects with a ``status`` attr.
|
||||
"""
|
||||
items = list(todos or [])
|
||||
if not items:
|
||||
return False
|
||||
|
||||
def _status(t: Any) -> Any:
|
||||
return t.get("status") if isinstance(t, dict) else getattr(t, "status", None)
|
||||
|
||||
return all(_status(t) in ("completed", "cancelled") for t in items)
|
||||
|
||||
|
||||
def derive_pet_state(
|
||||
*,
|
||||
busy: bool = False,
|
||||
awaiting_input: bool = False,
|
||||
error: bool = False,
|
||||
celebrate: bool = False,
|
||||
just_completed: bool = False,
|
||||
tool_running: bool = False,
|
||||
reasoning: bool = False,
|
||||
) -> PetState:
|
||||
"""Resolve the animation state from coarse activity signals.
|
||||
|
||||
Priority (highest first) — only one row can show at a time, so the most
|
||||
salient signal wins:
|
||||
|
||||
1. ``error`` → ``FAILED`` (a tool/turn just failed)
|
||||
2. ``celebrate`` → ``JUMP`` (explicit success beat, e.g. todos done)
|
||||
3. ``just_completed`` → ``WAVE`` (turn finished cleanly / greeting)
|
||||
4. ``awaiting_input`` → ``WAITING`` (blocked on the user — a clarify/approval
|
||||
prompt is open; this outranks the in-flight signals below because the turn
|
||||
is paused on *you*, even though a tool is technically mid-call)
|
||||
5. ``tool_running`` → ``RUN`` (a tool is executing)
|
||||
6. ``reasoning`` → ``REVIEW`` (model is thinking / reading)
|
||||
7. ``busy`` → ``RUN`` (turn in flight, unspecified work)
|
||||
8. otherwise → ``IDLE``
|
||||
"""
|
||||
if error:
|
||||
return PetState.FAILED
|
||||
if celebrate:
|
||||
return PetState.JUMP
|
||||
if just_completed:
|
||||
return PetState.WAVE
|
||||
if awaiting_input:
|
||||
return PetState.WAITING
|
||||
if tool_running:
|
||||
return PetState.RUN
|
||||
if reasoning:
|
||||
return PetState.REVIEW
|
||||
if busy:
|
||||
return PetState.RUN
|
||||
return PetState.IDLE
|
||||
316
agent/pet/store.py
Normal file
316
agent/pet/store.py
Normal file
|
|
@ -0,0 +1,316 @@
|
|||
"""On-disk pet store — install / list / resolve pets.
|
||||
|
||||
Pets live under ``get_hermes_home()/pets/<slug>/`` so every profile gets its
|
||||
own set (we deliberately do **not** reuse petdex's ``~/.codex/pets`` default —
|
||||
that's owned by the petdex npm CLI and isn't profile-aware). Each installed
|
||||
pet directory holds:
|
||||
|
||||
pets/<slug>/
|
||||
pet.json # {id, displayName, description, spritesheetPath}
|
||||
spritesheet.webp # (or .png)
|
||||
|
||||
The active pet is resolved from the caller-supplied ``display.pet.slug`` config
|
||||
value (falling back to the first installed pet), so this module stays free of
|
||||
the config loader.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from hermes_constants import get_hermes_home
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_DOWNLOAD_TIMEOUT = 60.0
|
||||
|
||||
|
||||
class PetStoreError(RuntimeError):
|
||||
"""Raised on install/IO failures."""
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class InstalledPet:
|
||||
"""A pet present on disk."""
|
||||
|
||||
slug: str
|
||||
display_name: str
|
||||
description: str
|
||||
directory: Path
|
||||
spritesheet: Path
|
||||
|
||||
@property
|
||||
def exists(self) -> bool:
|
||||
return self.spritesheet.is_file()
|
||||
|
||||
|
||||
def pets_dir() -> Path:
|
||||
"""Return the profile-scoped pets directory (created on demand)."""
|
||||
path = get_hermes_home() / "pets"
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
return path
|
||||
|
||||
|
||||
def _read_pet_json(directory: Path) -> dict:
|
||||
pet_json = directory / "pet.json"
|
||||
if not pet_json.is_file():
|
||||
return {}
|
||||
try:
|
||||
return json.loads(pet_json.read_text(encoding="utf-8"))
|
||||
except (OSError, ValueError) as exc:
|
||||
logger.debug("unreadable pet.json in %s: %s", directory, exc)
|
||||
return {}
|
||||
|
||||
|
||||
def _resolve_spritesheet(directory: Path, meta: dict) -> Path:
|
||||
"""Find the spritesheet for a pet dir.
|
||||
|
||||
Honors ``spritesheetPath`` from pet.json, else probes the conventional
|
||||
filenames (``spritesheet.{webp,png}`` and petdex R2's ``sprite.webp``).
|
||||
"""
|
||||
declared = str(meta.get("spritesheetPath", "") or "").strip()
|
||||
if declared:
|
||||
candidate = directory / declared
|
||||
if candidate.is_file():
|
||||
return candidate
|
||||
for name in ("spritesheet.webp", "spritesheet.png", "sprite.webp", "sprite.png"):
|
||||
candidate = directory / name
|
||||
if candidate.is_file():
|
||||
return candidate
|
||||
# Default expectation even if missing, so callers get a stable path.
|
||||
return directory / "spritesheet.webp"
|
||||
|
||||
|
||||
def load_pet(slug: str) -> InstalledPet | None:
|
||||
"""Return the :class:`InstalledPet` for *slug*, or ``None`` if absent."""
|
||||
slug = slug.strip()
|
||||
directory = pets_dir() / slug
|
||||
if not directory.is_dir():
|
||||
return None
|
||||
meta = _read_pet_json(directory)
|
||||
return InstalledPet(
|
||||
slug=slug,
|
||||
display_name=str(meta.get("displayName", "") or slug),
|
||||
description=str(meta.get("description", "") or ""),
|
||||
directory=directory,
|
||||
spritesheet=_resolve_spritesheet(directory, meta),
|
||||
)
|
||||
|
||||
|
||||
def installed_pets() -> list[InstalledPet]:
|
||||
"""Return every installed pet (dirs containing a usable spritesheet)."""
|
||||
out: list[InstalledPet] = []
|
||||
for child in sorted(pets_dir().iterdir()):
|
||||
if not child.is_dir():
|
||||
continue
|
||||
pet = load_pet(child.name)
|
||||
if pet and pet.exists:
|
||||
out.append(pet)
|
||||
return out
|
||||
|
||||
|
||||
def resolve_active_pet(configured_slug: str | None = None) -> InstalledPet | None:
|
||||
"""Resolve which pet to display.
|
||||
|
||||
Precedence: the configured slug (``display.pet.slug``) if it's installed,
|
||||
otherwise the first installed pet alphabetically, otherwise ``None``.
|
||||
"""
|
||||
if configured_slug:
|
||||
pet = load_pet(configured_slug.strip())
|
||||
if pet and pet.exists:
|
||||
return pet
|
||||
pets = installed_pets()
|
||||
return pets[0] if pets else None
|
||||
|
||||
|
||||
def install_pet(slug: str, *, force: bool = False, timeout: float = _DOWNLOAD_TIMEOUT) -> InstalledPet:
|
||||
"""Download *slug* from the manifest into the pets directory.
|
||||
|
||||
Idempotent: a fully-installed pet is returned as-is unless *force*. Raises
|
||||
:class:`PetStoreError` / :class:`~agent.pet.manifest.ManifestError` on
|
||||
failure.
|
||||
"""
|
||||
from agent.pet.manifest import find_entry
|
||||
|
||||
slug = slug.strip()
|
||||
existing = load_pet(slug)
|
||||
if existing and existing.exists and not force:
|
||||
return existing
|
||||
|
||||
entry = find_entry(slug, timeout=timeout)
|
||||
if entry is None:
|
||||
raise PetStoreError(f"pet '{slug}' is not in the petdex manifest")
|
||||
|
||||
directory = pets_dir() / slug
|
||||
directory.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
sprite_ext = ".png" if entry.spritesheet_url.lower().split("?")[0].endswith(".png") else ".webp"
|
||||
sprite_path = directory / f"spritesheet{sprite_ext}"
|
||||
|
||||
_download(entry.spritesheet_url, sprite_path, timeout=timeout)
|
||||
|
||||
# Fetch the upstream pet.json if present; otherwise synthesize a minimal
|
||||
# one so the local layout is self-describing.
|
||||
meta: dict = {}
|
||||
if entry.pet_json_url:
|
||||
try:
|
||||
meta = _download_json(entry.pet_json_url, timeout=timeout)
|
||||
except Exception as exc: # noqa: BLE001 - non-fatal, fall back below
|
||||
logger.debug("pet.json fetch failed for %s: %s", slug, exc)
|
||||
if not isinstance(meta, dict) or not meta:
|
||||
meta = {"id": slug, "displayName": entry.display_name, "description": ""}
|
||||
meta["spritesheetPath"] = sprite_path.name
|
||||
meta.setdefault("id", slug)
|
||||
meta.setdefault("displayName", entry.display_name)
|
||||
(directory / "pet.json").write_text(json.dumps(meta, indent=2), encoding="utf-8")
|
||||
|
||||
pet = load_pet(slug)
|
||||
if pet is None or not pet.exists:
|
||||
raise PetStoreError(f"install of '{slug}' did not produce a spritesheet")
|
||||
return pet
|
||||
|
||||
|
||||
_THUMB_FRAME_W = 192
|
||||
_THUMB_FRAME_H = 208
|
||||
_THUMB_W = 96 # rendered ~40px; 2x+ keeps it crisp on HiDPI
|
||||
|
||||
|
||||
def _thumbs_dir() -> Path:
|
||||
path = pets_dir() / ".thumbs"
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
return path
|
||||
|
||||
|
||||
def _is_petdex_host(url: str) -> bool:
|
||||
"""True only for petdex.dev hosts — bounds server-side fetch (anti-SSRF)."""
|
||||
from urllib.parse import urlparse
|
||||
|
||||
try:
|
||||
host = (urlparse(url).hostname or "").lower()
|
||||
except ValueError:
|
||||
return False
|
||||
return host == "petdex.dev" or host.endswith(".petdex.dev")
|
||||
|
||||
|
||||
def thumbnail_png(slug: str, *, source_url: str = "", timeout: float = 30.0) -> bytes | None:
|
||||
"""Return a small idle-frame PNG for *slug*, cached on disk.
|
||||
|
||||
Crops the top-left (idle, frame 0) cell of the spritesheet and downsamples
|
||||
it to a thumbnail. Source preference: an installed spritesheet on disk, else
|
||||
*source_url* — but only when it points at petdex (so the gateway never
|
||||
fetches an arbitrary client-supplied URL). Returns ``None`` when there's no
|
||||
usable source or Pillow/network fails; callers render a placeholder.
|
||||
|
||||
Doing this server-side sidesteps the renderer's CSP / R2 hotlink limits that
|
||||
break a direct ``<img src=cdn>`` and lets the result ride the authenticated
|
||||
gateway as a same-origin data URL.
|
||||
"""
|
||||
slug = slug.strip()
|
||||
if not slug:
|
||||
return None
|
||||
|
||||
cache = _thumbs_dir() / f"{slug}.png"
|
||||
if cache.is_file():
|
||||
try:
|
||||
return cache.read_bytes()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
sheet_bytes: bytes | None = None
|
||||
pet = load_pet(slug)
|
||||
if pet and pet.exists:
|
||||
try:
|
||||
sheet_bytes = pet.spritesheet.read_bytes()
|
||||
except OSError:
|
||||
sheet_bytes = None
|
||||
|
||||
if sheet_bytes is None and source_url and _is_petdex_host(source_url):
|
||||
try:
|
||||
import httpx
|
||||
|
||||
resp = httpx.get(
|
||||
source_url,
|
||||
timeout=timeout,
|
||||
follow_redirects=True,
|
||||
headers={"User-Agent": "hermes-agent-petdex"},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
sheet_bytes = resp.content
|
||||
except Exception as exc: # noqa: BLE001 - cosmetic, degrade to placeholder
|
||||
logger.debug("thumb fetch failed for %s: %s", slug, exc)
|
||||
|
||||
if not sheet_bytes:
|
||||
return None
|
||||
|
||||
try:
|
||||
import io
|
||||
|
||||
from PIL import Image
|
||||
|
||||
with Image.open(io.BytesIO(sheet_bytes)) as im:
|
||||
frame = im.convert("RGBA").crop(
|
||||
(0, 0, min(_THUMB_FRAME_W, im.width), min(_THUMB_FRAME_H, im.height))
|
||||
)
|
||||
height = round(_THUMB_W * _THUMB_FRAME_H / _THUMB_FRAME_W)
|
||||
frame = frame.resize((_THUMB_W, height), Image.NEAREST)
|
||||
buf = io.BytesIO()
|
||||
frame.save(buf, format="PNG")
|
||||
data = buf.getvalue()
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.debug("thumb crop failed for %s: %s", slug, exc)
|
||||
return None
|
||||
|
||||
try:
|
||||
cache.write_bytes(data)
|
||||
except OSError:
|
||||
pass
|
||||
return data
|
||||
|
||||
|
||||
def remove_pet(slug: str) -> bool:
|
||||
"""Delete an installed pet directory. Returns True if anything was removed."""
|
||||
import shutil
|
||||
|
||||
directory = pets_dir() / slug.strip()
|
||||
if not directory.is_dir():
|
||||
return False
|
||||
shutil.rmtree(directory, ignore_errors=True)
|
||||
return not directory.exists()
|
||||
|
||||
|
||||
def _download(url: str, dest: Path, *, timeout: float) -> None:
|
||||
import httpx
|
||||
|
||||
try:
|
||||
with httpx.stream(
|
||||
"GET",
|
||||
url,
|
||||
timeout=timeout,
|
||||
follow_redirects=True,
|
||||
headers={"User-Agent": "hermes-agent-petdex"},
|
||||
) as resp:
|
||||
resp.raise_for_status()
|
||||
tmp = dest.with_suffix(dest.suffix + ".part")
|
||||
with tmp.open("wb") as fh:
|
||||
for chunk in resp.iter_bytes():
|
||||
fh.write(chunk)
|
||||
tmp.replace(dest)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
raise PetStoreError(f"download failed for {url}: {exc}") from exc
|
||||
|
||||
|
||||
def _download_json(url: str, *, timeout: float) -> dict:
|
||||
import httpx
|
||||
|
||||
resp = httpx.get(
|
||||
url,
|
||||
timeout=timeout,
|
||||
follow_redirects=True,
|
||||
headers={"User-Agent": "hermes-agent-petdex"},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
return data if isinstance(data, dict) else {}
|
||||
|
|
@ -1633,6 +1633,31 @@ DEFAULT_CONFIG = {
|
|||
"fields": ["model", "context_pct", "cwd"], # Order shown; drop any to hide
|
||||
},
|
||||
"copy_shortcut": "auto", # "auto" (platform default) | "ctrl_c" | "ctrl_shift_c" | "disabled"
|
||||
# Petdex animated mascot (https://github.com/crafter-station/petdex).
|
||||
# A purely cosmetic sprite that reacts to agent activity across the
|
||||
# CLI, TUI, and desktop app. Manage with `hermes pets`. Disabled until
|
||||
# a pet is installed + selected (no effect on prompt caching — this is
|
||||
# a display concern only).
|
||||
"pet": {
|
||||
"enabled": False,
|
||||
# Active pet slug; resolved against installed pets in
|
||||
# get_hermes_home()/pets/. Empty → first installed pet.
|
||||
"slug": "",
|
||||
# Terminal render protocol for CLI/TUI:
|
||||
# auto — detect kitty/iTerm2/sixel, else unicode half-blocks
|
||||
# kitty | iterm | sixel | unicode | off
|
||||
"render_mode": "auto",
|
||||
# Master size scalar (relative to native 192×208 frames). One knob
|
||||
# shrinks every surface: the desktop canvas scales its pixels by it
|
||||
# and the CLI/TUI derive their terminal column width from it. The
|
||||
# half-block fallback clamps to a legibility floor (it can't shrink
|
||||
# as far as true-pixel kitty/GUI without turning to mush).
|
||||
"scale": 0.33,
|
||||
# Hard override for terminal column width. 0 = auto (derive from
|
||||
# scale); set a positive int only to pin the half-block/kitty width
|
||||
# independently of scale.
|
||||
"unicode_cols": 0,
|
||||
},
|
||||
},
|
||||
|
||||
# Web dashboard settings
|
||||
|
|
|
|||
371
tests/agent/test_pet_engine.py
Normal file
371
tests/agent/test_pet_engine.py
Normal file
|
|
@ -0,0 +1,371 @@
|
|||
"""Tests for the petdex pet engine (agent/pet/*).
|
||||
|
||||
Behavior/invariant focused — no network, no live manifest. A tiny synthetic
|
||||
spritesheet is generated with Pillow so render paths exercise real decode
|
||||
without depending on a downloaded pet.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
|
||||
import pytest
|
||||
|
||||
from agent.pet import constants, render, state, store
|
||||
from agent.pet.constants import FRAME_H, FRAME_W, PetState
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
# state mapping — priority invariants
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def test_derive_idle_default():
|
||||
assert state.derive_pet_state() is PetState.IDLE
|
||||
# awaiting input uses the dedicated waiting row when available.
|
||||
assert state.derive_pet_state(awaiting_input=True) is PetState.WAITING
|
||||
|
||||
|
||||
def test_derive_priority_order():
|
||||
# error beats everything
|
||||
assert state.derive_pet_state(error=True, celebrate=True, busy=True) is PetState.FAILED
|
||||
# celebrate beats completion/tool
|
||||
assert state.derive_pet_state(celebrate=True, just_completed=True, tool_running=True) is PetState.JUMP
|
||||
# completion beats waiting/tool
|
||||
assert state.derive_pet_state(just_completed=True, awaiting_input=True) is PetState.WAVE
|
||||
# waiting (blocked on the user) outranks the in-flight signals — a clarify
|
||||
# mid-turn pauses on you even though a tool is technically still open.
|
||||
assert state.derive_pet_state(awaiting_input=True, tool_running=True, busy=True) is PetState.WAITING
|
||||
# tool beats reasoning
|
||||
assert state.derive_pet_state(tool_running=True, reasoning=True) is PetState.RUN
|
||||
# reasoning beats bare-busy
|
||||
assert state.derive_pet_state(reasoning=True, busy=True) is PetState.REVIEW
|
||||
# bare busy runs
|
||||
assert state.derive_pet_state(busy=True) is PetState.RUN
|
||||
|
||||
|
||||
def test_todos_all_done():
|
||||
# empty / falsy → not done (no plan to celebrate)
|
||||
assert state.todos_all_done(None) is False
|
||||
assert state.todos_all_done([]) is False
|
||||
# any open item → not done
|
||||
assert state.todos_all_done([{"status": "completed"}, {"status": "pending"}]) is False
|
||||
assert state.todos_all_done([{"status": "in_progress"}]) is False
|
||||
# every item terminal → done (completed and/or cancelled)
|
||||
assert state.todos_all_done([{"status": "completed"}, {"status": "cancelled"}]) is True
|
||||
|
||||
# objects with a .status attr work too (mirrors dict + attr access)
|
||||
class _T:
|
||||
def __init__(self, status):
|
||||
self.status = status
|
||||
|
||||
assert state.todos_all_done([_T("completed")]) is True
|
||||
assert state.todos_all_done([_T("completed"), _T("pending")]) is False
|
||||
|
||||
|
||||
def test_state_row_index_maps_to_supported_atlas_taxonomies():
|
||||
# Current Petdex sheets are 8 columns x 9 rows.
|
||||
assert constants.state_row_index(PetState.IDLE, 9) == 0
|
||||
assert constants.state_row_index(PetState.WAVE, 9) == 3
|
||||
assert constants.state_row_index(PetState.JUMP, 9) == 4
|
||||
assert constants.state_row_index(PetState.FAILED, 9) == 5
|
||||
assert constants.state_row_index(PetState.WAITING, 9) == 6
|
||||
assert constants.state_row_index(PetState.RUN, 9) == 7
|
||||
assert constants.state_row_index(PetState.REVIEW, 9) == 8
|
||||
|
||||
# Legacy Hermes/petdex sheets were 8 rows with Hermes state names packed in
|
||||
# order. Keep those readable instead of forcing old installs through the
|
||||
# newer Codex taxonomy.
|
||||
assert constants.state_row_index(PetState.WAVE, 8) == 1
|
||||
assert constants.state_row_index(PetState.RUN, 8) == 2
|
||||
assert constants.state_row_index(PetState.FAILED, 8) == 3
|
||||
assert constants.state_row_index(PetState.REVIEW, 8) == 4
|
||||
assert constants.state_row_index(PetState.JUMP, 8) == 5
|
||||
assert constants.state_row_index(PetState.WAITING, 8) == 0
|
||||
|
||||
# Alias rows resolve as expected.
|
||||
assert constants.state_row_index("wave", 9) == constants.state_row_index("waving", 9) == 3
|
||||
assert constants.state_row_index("jump", 9) == constants.state_row_index("jumping", 9) == 4
|
||||
assert constants.state_row_index("run", 9) == constants.state_row_index("running", 9) == 7
|
||||
|
||||
# unknown row names clamp to idle (row 0), never raise
|
||||
assert constants.state_row_index("nonsense") == 0
|
||||
|
||||
|
||||
def test_cols_for_scale_is_monotonic_and_floored():
|
||||
# scale is the master size knob: smaller scale never yields more columns,
|
||||
# and half-blocks clamp to a legibility floor rather than devolving to mush.
|
||||
sizes = [constants.cols_for_scale(s) for s in (0.1, 0.3, 0.5, 0.7, 1.0, 1.5)]
|
||||
assert sizes == sorted(sizes)
|
||||
assert all(c >= constants.UNICODE_MIN_COLS for c in sizes)
|
||||
# tiny scales pin to the floor; large scales grow past it.
|
||||
assert constants.cols_for_scale(0.05) == constants.UNICODE_MIN_COLS
|
||||
assert constants.cols_for_scale(0.33) == constants.UNICODE_MIN_COLS
|
||||
assert constants.cols_for_scale(2.0) > constants.UNICODE_MIN_COLS
|
||||
|
||||
|
||||
def test_resolve_cols_override_else_scale():
|
||||
# 0 / falsy → derive from scale; a positive int hard-overrides scale.
|
||||
assert constants.resolve_cols(0.7, 0) == constants.cols_for_scale(0.7)
|
||||
assert constants.resolve_cols(0.7, None) == constants.cols_for_scale(0.7)
|
||||
assert constants.resolve_cols(2.0, 12) == 12
|
||||
assert constants.resolve_cols(0.1, -5) == constants.cols_for_scale(0.1)
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
# synthetic spritesheet fixture
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
|
||||
@pytest.fixture
|
||||
def boba_like(tmp_path, monkeypatch):
|
||||
"""Install a synthetic 8-col × 9-row pet into a temp HERMES_HOME."""
|
||||
from PIL import Image
|
||||
|
||||
home = tmp_path / ".hermes"
|
||||
home.mkdir()
|
||||
monkeypatch.setenv("HERMES_HOME", str(home))
|
||||
|
||||
cols, rows = 8, 9
|
||||
sheet = Image.new("RGBA", (FRAME_W * cols, FRAME_H * rows), (0, 0, 0, 0))
|
||||
# paint each row a distinct opaque color so frames are non-empty
|
||||
for r in range(rows):
|
||||
color = (20 + r * 25, 60, 120, 255)
|
||||
for c in range(cols):
|
||||
block = Image.new("RGBA", (FRAME_W, FRAME_H), color)
|
||||
sheet.paste(block, (c * FRAME_W, r * FRAME_H))
|
||||
|
||||
pet_dir = store.pets_dir() / "boba"
|
||||
pet_dir.mkdir(parents=True, exist_ok=True)
|
||||
sheet.save(pet_dir / "spritesheet.webp")
|
||||
(pet_dir / "pet.json").write_text(
|
||||
'{"id":"boba","displayName":"Boba","description":"d","spritesheetPath":"spritesheet.webp"}'
|
||||
)
|
||||
return pet_dir
|
||||
|
||||
|
||||
def test_store_install_resolution(boba_like):
|
||||
pets = store.installed_pets()
|
||||
assert [p.slug for p in pets] == ["boba"]
|
||||
assert store.installed_pets()[0].exists
|
||||
|
||||
# configured slug wins when installed
|
||||
assert store.resolve_active_pet("boba").slug == "boba"
|
||||
# bogus slug falls back to first installed
|
||||
assert store.resolve_active_pet("does-not-exist").slug == "boba"
|
||||
# display metadata flows from pet.json
|
||||
assert store.load_pet("boba").display_name == "Boba"
|
||||
|
||||
|
||||
def test_store_remove(boba_like):
|
||||
assert store.remove_pet("boba") is True
|
||||
assert store.installed_pets() == []
|
||||
assert store.remove_pet("boba") is False # idempotent
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
# render — decode + every encoder produces output
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def test_renderer_decodes_frames(boba_like):
|
||||
sprite = store.load_pet("boba").spritesheet
|
||||
r = render.PetRenderer(str(sprite), mode="unicode", scale=0.5, unicode_cols=12)
|
||||
assert r.available
|
||||
# standard sheet yields FRAMES_PER_STATE frames per state
|
||||
assert r.frame_count("idle") == constants.FRAMES_PER_STATE
|
||||
assert r.frame_count(PetState.RUN) == constants.FRAMES_PER_STATE
|
||||
|
||||
|
||||
def test_trims_trailing_blank_frames(tmp_path):
|
||||
"""Ragged state rows (real frames + transparent padding) trim to real count.
|
||||
|
||||
petdex sheets are left-packed: a state with fewer than FRAMES_PER_STATE real
|
||||
frames pads the trailing columns transparent. Stepping into one flashes the
|
||||
pet blank, so the engine must stop the row at the first gap.
|
||||
"""
|
||||
from PIL import Image
|
||||
|
||||
cols, rows = 8, 9
|
||||
sheet = Image.new("RGBA", (FRAME_W * cols, FRAME_H * rows), (0, 0, 0, 0))
|
||||
# row index -> number of real (opaque) frames; the rest stay transparent.
|
||||
# Codex row taxonomy: idle, running-right, running-left, wave, jump, failed,
|
||||
# waiting, run, review.
|
||||
real = {0: 6, 3: 4, 4: 5, 5: 8, 7: 6, 8: 5}
|
||||
for r, k in real.items():
|
||||
for c in range(k):
|
||||
block = Image.new("RGBA", (FRAME_W, FRAME_H), (200, 80, 80, 255))
|
||||
sheet.paste(block, (c * FRAME_W, r * FRAME_H))
|
||||
sprite = tmp_path / "ragged.webp"
|
||||
sheet.save(sprite)
|
||||
|
||||
r = render.PetRenderer(str(sprite), mode="unicode", scale=0.5)
|
||||
# Full rows cap at FRAMES_PER_STATE; ragged rows trim to their real count.
|
||||
assert r.frame_count("idle") == constants.FRAMES_PER_STATE
|
||||
assert r.frame_count("run") == constants.FRAMES_PER_STATE
|
||||
assert r.frame_count("wave") == 4
|
||||
assert r.frame_count("jump") == 5
|
||||
assert r.frame_count("failed") == constants.FRAMES_PER_STATE
|
||||
assert r.frame_count("review") == 5
|
||||
|
||||
# Every stepped frame is non-empty — no blank flash for the trimmed states.
|
||||
for state in ("wave", "jump", "review"):
|
||||
for i in range(r.frame_count(state)):
|
||||
assert r.frame(state, i), f"{state}[{i}] rendered blank"
|
||||
|
||||
counts = render.state_frame_counts(str(sprite))
|
||||
assert counts == {
|
||||
"idle": 6,
|
||||
"wave": 4,
|
||||
"run": 6,
|
||||
"failed": 6,
|
||||
"review": 5,
|
||||
"jump": 5,
|
||||
"waiting": 0,
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.parametrize("mode", ["unicode", "kitty", "iterm", "sixel"])
|
||||
def test_every_encoder_emits(boba_like, mode):
|
||||
sprite = store.load_pet("boba").spritesheet
|
||||
r = render.PetRenderer(str(sprite), mode=mode, scale=0.4)
|
||||
frame = r.frame("run", 1)
|
||||
assert isinstance(frame, str) and frame, f"{mode} produced no frame"
|
||||
if mode == "unicode":
|
||||
assert "\x1b[" in frame # has color escapes
|
||||
elif mode == "kitty":
|
||||
assert frame.startswith("\x1b_G")
|
||||
elif mode == "iterm":
|
||||
assert frame.startswith("\x1b]1337;File=")
|
||||
elif mode == "sixel":
|
||||
assert frame.startswith("\x1bP")
|
||||
|
||||
|
||||
def test_frame_index_wraps(boba_like):
|
||||
sprite = store.load_pet("boba").spritesheet
|
||||
r = render.PetRenderer(str(sprite), mode="unicode", scale=0.4)
|
||||
# index beyond count wraps rather than indexing out of range
|
||||
assert r.frame("idle", 999) == r.frame("idle", 999 % r.frame_count("idle"))
|
||||
|
||||
|
||||
def test_cells_grid_shape(boba_like):
|
||||
sprite = store.load_pet("boba").spritesheet
|
||||
r = render.PetRenderer(str(sprite), mode="unicode", scale=0.4, unicode_cols=14)
|
||||
grid = r.cells("run", 0, cols=14)
|
||||
assert grid, "no cells produced"
|
||||
# every row is the requested width; every cell is (top, bottom) RGBA pairs
|
||||
assert all(len(row) == 14 for row in grid)
|
||||
(top, bottom) = grid[0][0]
|
||||
assert len(top) == 4 and len(bottom) == 4
|
||||
# missing-sheet renderer yields no cells, never raises
|
||||
assert render.PetRenderer(str(sprite.parent / "missing.webp"), mode="unicode").cells("idle", 0) == []
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
# render — kitty Unicode placeholders (TUI graphics path)
|
||||
# ─────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def test_kitty_image_id_stable_bounded_nonzero():
|
||||
# Deterministic per slug so re-renders reuse the same terminal-side image,
|
||||
# and always a valid 24-bit-encodable, non-zero id.
|
||||
a = render.kitty_image_id("boba")
|
||||
assert a == render.kitty_image_id("boba")
|
||||
assert 1 <= a <= 0x7FFF
|
||||
|
||||
|
||||
def test_kitty_color_hex_decodes_to_id():
|
||||
# The placeholder's foreground color IS the image id (24-bit). The terminal
|
||||
# reconstructs id = (r<<16)|(g<<8)|b, so the hex must round-trip.
|
||||
for slug in ("boba", "clawd", "pixel-fox"):
|
||||
image_id = render.kitty_image_id(slug)
|
||||
h = render.kitty_color_hex(image_id)
|
||||
assert h.startswith("#") and len(h) == 7
|
||||
assert int(h[1:], 16) == image_id
|
||||
|
||||
|
||||
def test_kitty_placeholder_rows_grid_contract():
|
||||
cols, rows = 18, 10
|
||||
grid = render.kitty_placeholder_rows(cols, rows)
|
||||
assert len(grid) == rows
|
||||
placeholder = "\U0010eeee"
|
||||
for r, row in enumerate(grid):
|
||||
# Each line is exactly `cols` placeholder cells (combining diacritics
|
||||
# are zero-width, so this is the rendered width Ink must measure).
|
||||
assert row.count(placeholder) == cols
|
||||
# First cell carries this row's diacritic; the rest inherit row + col.
|
||||
assert row.startswith(placeholder + chr(render._ROWCOL_DIACRITICS[r]))
|
||||
|
||||
|
||||
def test_kitty_payload_structure(boba_like):
|
||||
sprite = store.load_pet("boba").spritesheet
|
||||
image_id = render.kitty_image_id("boba")
|
||||
scale = 0.4
|
||||
r = render.PetRenderer(str(sprite), mode="kitty", scale=scale, unicode_cols=18)
|
||||
payload = r.kitty_payload("run", image_id=image_id)
|
||||
assert payload is not None
|
||||
# placement box must follow scaled pixels, not unicode_cols (kitty upscales to c×r).
|
||||
frames = r._frames("run")
|
||||
expect_cols, expect_rows = r._cell_box(frames[0])
|
||||
assert payload["cols"] == expect_cols
|
||||
assert payload["rows"] == expect_rows
|
||||
assert expect_cols < 18 # 0.4 scale is much smaller than a pinned 18-col box
|
||||
# placeholder grid matches the requested geometry
|
||||
assert len(payload["placeholder"]) == payload["rows"]
|
||||
# one transmit escape per animation frame, each a kitty virtual placement
|
||||
assert len(payload["frames"]) == r.frame_count("run")
|
||||
for esc in payload["frames"]:
|
||||
assert esc.startswith("\x1b_G")
|
||||
assert esc.endswith("\x1b\\")
|
||||
assert f"i={image_id}" in esc
|
||||
assert "a=T" in esc and "U=1" in esc
|
||||
assert f"c={payload['cols']}" in esc and f"r={payload['rows']}" in esc
|
||||
|
||||
|
||||
def test_kitty_payload_none_when_no_frames(tmp_path):
|
||||
r = render.PetRenderer(str(tmp_path / "missing.webp"), mode="kitty")
|
||||
assert r.kitty_payload("idle", image_id=1) is None
|
||||
|
||||
|
||||
def test_off_mode_and_missing_sheet_degrade(tmp_path):
|
||||
# off mode never emits
|
||||
r_off = render.PetRenderer(str(tmp_path / "nope.webp"), mode="off")
|
||||
assert r_off.frame("idle", 0) == ""
|
||||
# missing sheet → not available, empty frames, no raise
|
||||
r_missing = render.PetRenderer(str(tmp_path / "nope.webp"), mode="unicode")
|
||||
assert not r_missing.available
|
||||
assert r_missing.frame("idle", 0) == ""
|
||||
|
||||
|
||||
def test_resolve_mode_non_tty_is_off():
|
||||
# a non-tty stream forces 'off' regardless of configured mode
|
||||
assert render.resolve_mode("kitty", stream=io.StringIO()) == "off"
|
||||
assert render.resolve_mode("auto", stream=io.StringIO()) == "off"
|
||||
|
||||
|
||||
def test_detect_terminal_graphics_env(monkeypatch):
|
||||
for key in ("KITTY_WINDOW_ID", "TERM_PROGRAM", "ITERM_SESSION_ID", "WEZTERM_PANE", "TERM"):
|
||||
monkeypatch.delenv(key, raising=False)
|
||||
|
||||
monkeypatch.setenv("KITTY_WINDOW_ID", "1")
|
||||
assert render.detect_terminal_graphics() == "kitty"
|
||||
monkeypatch.delenv("KITTY_WINDOW_ID")
|
||||
|
||||
monkeypatch.setenv("TERM_PROGRAM", "iTerm.app")
|
||||
assert render.detect_terminal_graphics() == "iterm"
|
||||
monkeypatch.delenv("TERM_PROGRAM")
|
||||
|
||||
monkeypatch.setenv("TERM", "xterm-256color")
|
||||
assert render.detect_terminal_graphics() == "unicode"
|
||||
|
||||
|
||||
def test_vscode_terminal_ignores_leaked_graphics_env(monkeypatch):
|
||||
# The VS Code / Cursor integrated terminal can't show inline images by
|
||||
# default, yet inherits ITERM_SESSION_ID/KITTY_WINDOW_ID when launched from
|
||||
# those terminals. TERM_PROGRAM=vscode must win → unicode, never a protocol
|
||||
# whose escapes the embedded terminal would silently drop.
|
||||
for key in ("KITTY_WINDOW_ID", "TERM_PROGRAM", "ITERM_SESSION_ID", "WEZTERM_PANE", "TERM"):
|
||||
monkeypatch.delenv(key, raising=False)
|
||||
monkeypatch.setenv("TERM_PROGRAM", "vscode")
|
||||
|
||||
assert render.detect_terminal_graphics() == "unicode"
|
||||
for leaked in ("ITERM_SESSION_ID", "KITTY_WINDOW_ID", "WEZTERM_PANE"):
|
||||
monkeypatch.setenv(leaked, "1")
|
||||
assert render.detect_terminal_graphics() == "unicode"
|
||||
monkeypatch.delenv(leaked)
|
||||
Loading…
Add table
Add a link
Reference in a new issue