chore: address copilot comments

This commit is contained in:
Austin Pickett 2026-04-24 12:51:04 -04:00
parent 5500b51800
commit 850fac14e3
8 changed files with 87 additions and 31 deletions

View file

@ -13,12 +13,15 @@ already passes to ``write``). No JSON-RPC envelope here — the dashboard's
Failure mode: silent. The agent loop must never block waiting for the
sidecar to drain. A dead WS short-circuits all subsequent writes.
Actual ``send`` calls run on a daemon thread so the TeeTransport's
``write`` returns after enqueueing (best-effort; drop when the queue is full).
"""
from __future__ import annotations
import json
import logging
import queue
import threading
from typing import Optional
@ -29,15 +32,21 @@ except ImportError: # pragma: no cover - websockets is a required install path
_log = logging.getLogger(__name__)
_DRAIN_STOP = object()
_QUEUE_MAX = 256
class WsPublisherTransport:
__slots__ = ("_url", "_lock", "_ws", "_dead")
__slots__ = ("_url", "_lock", "_ws", "_dead", "_q", "_worker")
def __init__(self, url: str, *, connect_timeout: float = 2.0) -> None:
self._url = url
self._lock = threading.Lock()
self._ws: Optional[object] = None
self._dead = False
self._q: queue.Queue[object] = queue.Queue(maxsize=_QUEUE_MAX)
self._worker: Optional[threading.Thread] = None
if ws_connect is None:
self._dead = True
@ -51,30 +60,66 @@ class WsPublisherTransport:
self._dead = True
self._ws = None
return
self._worker = threading.Thread(
target=self._drain,
name="hermes-ws-pub",
daemon=True,
)
self._worker.start()
def _drain(self) -> None:
while True:
item = self._q.get()
if item is _DRAIN_STOP:
return
if not isinstance(item, str):
continue
if self._ws is None:
continue
try:
with self._lock:
if self._ws is not None:
self._ws.send(item) # type: ignore[union-attr]
except Exception as exc:
_log.debug("event publisher write failed: %s", exc)
self._dead = True
self._ws = None
def write(self, obj: dict) -> bool:
if self._dead or self._ws is None:
if self._dead or self._ws is None or self._worker is None:
return False
line = json.dumps(obj, ensure_ascii=False)
try:
with self._lock:
self._ws.send(json.dumps(obj, ensure_ascii=False)) # type: ignore[union-attr]
self._q.put_nowait(line)
return True
except Exception as exc:
_log.debug("event publisher write failed: %s", exc)
self._dead = True
self._ws = None
except queue.Full:
return False
def close(self) -> None:
self._dead = True
w = self._worker
if w is not None and w.is_alive():
try:
self._q.put_nowait(_DRAIN_STOP)
except queue.Full:
# Best-effort: if the queue is wedged, the daemon thread
# will be torn down with the process.
pass
w.join(timeout=3.0)
self._worker = None
if self._ws is None:
return
try:
self._ws.close() # type: ignore[union-attr]
with self._lock:
if self._ws is not None:
self._ws.close() # type: ignore[union-attr]
except Exception:
pass

View file

@ -107,12 +107,14 @@ class TeeTransport:
self._secondaries = secondaries
def write(self, obj: dict) -> bool:
# Primary first so a slow sidecar (WS publisher) never delays Ink/stdio.
ok = self._primary.write(obj)
for sec in self._secondaries:
try:
sec.write(obj)
except Exception:
pass
return self._primary.write(obj)
return ok
def close(self) -> None:
try: