mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-08 03:01:47 +00:00
115 lines
3.9 KiB
Python
115 lines
3.9 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import Any
|
|
|
|
import pyte
|
|
|
|
|
|
class AsciinemaStreamDecoder:
|
|
def __init__(self, *, default_width: int = 80, default_height: int = 24) -> None:
|
|
self._default_width = max(1, int(default_width))
|
|
self._default_height = max(1, int(default_height))
|
|
self._buffer = ""
|
|
self._has_header = False
|
|
self.width = self._default_width
|
|
self.height = self._default_height
|
|
self._screen = pyte.Screen(self.width, self.height)
|
|
self._stream = pyte.Stream(self._screen)
|
|
|
|
def reset(self) -> None:
|
|
self._buffer = ""
|
|
self._has_header = False
|
|
self.width = self._default_width
|
|
self.height = self._default_height
|
|
self._screen = pyte.Screen(self.width, self.height)
|
|
self._stream = pyte.Stream(self._screen)
|
|
|
|
def feed(self, chunk: str | bytes) -> None:
|
|
if not chunk:
|
|
return
|
|
if isinstance(chunk, bytes):
|
|
chunk = chunk.decode("utf-8", errors="replace")
|
|
self._buffer += chunk
|
|
while True:
|
|
line, sep, rest = self._buffer.partition("\n")
|
|
if not sep:
|
|
break
|
|
self._buffer = rest
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
parsed = self._parse_json_line(line)
|
|
if parsed is None:
|
|
continue
|
|
if not self._has_header:
|
|
if isinstance(parsed, dict):
|
|
self._init_from_header(parsed)
|
|
continue
|
|
if isinstance(parsed, list):
|
|
self._has_header = True
|
|
self._apply_event(parsed)
|
|
continue
|
|
continue
|
|
if isinstance(parsed, list):
|
|
self._apply_event(parsed)
|
|
|
|
def render(self) -> str:
|
|
return "\n".join(self._screen.display)
|
|
|
|
def _parse_json_line(self, line: str) -> Any | None:
|
|
try:
|
|
return json.loads(line)
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
def _init_from_header(self, header: dict[str, Any]) -> None:
|
|
width = _coerce_int(
|
|
header.get("width") or header.get("columns") or header.get("cols"),
|
|
self._default_width,
|
|
)
|
|
height = _coerce_int(
|
|
header.get("height") or header.get("rows") or header.get("lines"),
|
|
self._default_height,
|
|
)
|
|
self.width = max(1, width)
|
|
self.height = max(1, height)
|
|
self._screen = pyte.Screen(self.width, self.height)
|
|
self._stream = pyte.Stream(self._screen)
|
|
self._has_header = True
|
|
|
|
def _apply_event(self, event: list[Any]) -> None:
|
|
if len(event) < 2:
|
|
return
|
|
event_type = event[1]
|
|
payload = event[2] if len(event) > 2 else ""
|
|
if event_type == "o":
|
|
if isinstance(payload, str):
|
|
self._stream.feed(payload)
|
|
elif event_type == "r":
|
|
width, height = _parse_resize(payload)
|
|
if width and height:
|
|
self.width = width
|
|
self.height = height
|
|
self._screen.resize(width, height)
|
|
|
|
|
|
def _coerce_int(value: Any, default: int) -> int:
|
|
try:
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return int(default)
|
|
|
|
|
|
def _parse_resize(payload: Any) -> tuple[int, int]:
|
|
if isinstance(payload, str) and "x" in payload:
|
|
left, right = payload.lower().split("x", 1)
|
|
return _coerce_int(left, 0), _coerce_int(right, 0)
|
|
if isinstance(payload, dict):
|
|
width = _coerce_int(payload.get("width") or payload.get("columns") or payload.get("cols"), 0)
|
|
height = _coerce_int(payload.get("height") or payload.get("rows") or payload.get("lines"), 0)
|
|
return width, height
|
|
if isinstance(payload, list) and len(payload) >= 2:
|
|
return _coerce_int(payload[0], 0), _coerce_int(payload[1], 0)
|
|
return 0, 0
|
|
|