mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-03 02:11:48 +00:00
The audit of v4.1 surfaced ~70 issues across the five scripts and three
reference docs — most user-visible (silent file overwrites, status-error
misclassified as success, X-API-Key leaked to S3 on /api/view redirect,
Cloud endpoints that 404 because they were renamed). v5.0.0 fixes those
and fills the gaps that previously forced users to write their own glue
(WebSocket monitoring, batch/sweep, img2img upload helper, dep auto-fix,
log fetch, health check, example workflows).
Critical fixes
- run_workflow.py: poll_status now checks status_str==error BEFORE
completed:true, so a failed run no longer reports success
- run_workflow.py: download_output streams to disk via safe_path_join,
preserves server subfolder structure (no silent overwrites), and
retries with exponential backoff
- run_workflow.py: refuses to overwrite a link with a literal in
inject_params (would silently break wiring)
- _common.py: _StripSensitiveOnRedirectSession (subclasses
requests.Session.rebuild_auth) drops X-API-Key/Cookie on cross-host
redirects — fixes a real key-leak path through Cloud's signed-URL
download flow. Tested
- Cloud routing (verified live): /history → /history_v2,
/models/<f> → /experiment/models/<f>, plus folder aliases for the
unet ↔ diffusion_models and clip ↔ text_encoders rename
- check_deps.py: distinguishes 200/empty vs 404 folder_not_found vs
403 free-tier; emits concrete fix_command per missing dep
- extract_schema.py: prompt vs negative_prompt determined by tracing
KSampler.{positive,negative} connections (incl. through Reroute /
Primitive nodes) instead of meta-title heuristic; symmetric
duplicate-name resolution; cycle-safe trace_to_node
- hardware_check.py: multi-GPU pick-best, Apple variant detection,
Rosetta detection, WSL2, ROCm --json, disk-space check, optional
PyTorch probe; powershell preferred over deprecated wmic
- comfyui_setup.sh: prefers pipx → uvx → pip --user (with PEP-668
fallback); idempotent — skips relaunch if server already up;
configurable port/workspace; persistent log; SIGINT trap
New scripts
- run_batch.py — count or sweep (cartesian product), parallel up to
cloud tier limit
- ws_monitor.py — real-time WebSocket viewer; saves preview frames
- auto_fix_deps.py — runs comfy node install / model download for
whatever check_deps reports missing (with --dry-run)
- health_check.py — single command that runs the verification checklist
(comfy-cli + server + checkpoints + optional smoke test that cancels
itself to avoid burning compute)
- fetch_logs.py — pull traceback / status messages for a prompt_id
Coverage expansion
- Param patterns now cover Flux (BasicScheduler, BasicGuider,
RandomNoise, ModelSamplingFlux), SD3, Wan/Hunyuan/LTX video,
IPAdapter, rgthree, easy-use, AnimateDiff
- Embedding refs in CLIPTextEncode strings extracted as model deps
- ckpt_name / vae_name / lora_name / unet_name now controllable so
workflows can be retargeted per run
Examples
- workflows/{sd15,sdxl,flux_dev}_txt2img.json
- workflows/sdxl_{img2img,inpaint}.json
- workflows/upscale_4x.json
- workflows/{animatediff_video,wan_video_t2v}.json + README
Tests
- 117 tests (105 unit + 8 cloud integration + 4 cross-host security)
- Cloud tests auto-skip without COMFY_CLOUD_API_KEY; verified end-to-end
against live cloud API
Backwards compatibility
- All existing CLI flags continue to work; new behavior is opt-in
(--ws, --input-image, --randomize-seed, --flat-output, etc.)
267 lines
10 KiB
Python
Executable file
267 lines
10 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
ws_monitor.py — Real-time ComfyUI WebSocket monitor.
|
|
|
|
Connects to /ws and pretty-prints execution events: node start/finish, sampling
|
|
progress, cached nodes, errors. Optionally writes preview frames to disk.
|
|
|
|
Useful for:
|
|
- Watching a long-running job in real time without parsing JSON yourself
|
|
- Saving in-progress preview frames for video / animation workflows
|
|
- Debugging "why is this hanging?" — see exactly which node is stuck
|
|
|
|
Usage:
|
|
# Local — watch all jobs from this client_id
|
|
python3 ws_monitor.py
|
|
|
|
# Cloud — watch a specific prompt_id
|
|
python3 ws_monitor.py --host https://cloud.comfy.org \
|
|
--prompt-id abc-123-def
|
|
|
|
# Save preview frames to ./previews/
|
|
python3 ws_monitor.py --previews ./previews
|
|
|
|
Requires: websocket-client (`pip install websocket-client`).
|
|
Falls back to a clear error message when not installed.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import struct
|
|
import sys
|
|
from pathlib import Path
|
|
from urllib.parse import urlparse
|
|
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent))
|
|
from _common import ( # noqa: E402
|
|
DEFAULT_LOCAL_HOST, ENV_API_KEY, log, new_client_id, resolve_api_key, is_cloud_host,
|
|
)
|
|
|
|
|
|
# Binary frame types from ComfyUI WebSocket protocol
|
|
BINARY_PREVIEW_IMAGE = 1
|
|
BINARY_TEXT = 3
|
|
BINARY_PREVIEW_IMAGE_WITH_METADATA = 4
|
|
|
|
# Image type codes inside PREVIEW_IMAGE
|
|
IMAGE_TYPE_JPEG = 1
|
|
IMAGE_TYPE_PNG = 2
|
|
|
|
# ANSI escape codes (works on most modern terminals)
|
|
RESET = "\033[0m"
|
|
DIM = "\033[2m"
|
|
BOLD = "\033[1m"
|
|
GREEN = "\033[32m"
|
|
YELLOW = "\033[33m"
|
|
RED = "\033[31m"
|
|
CYAN = "\033[36m"
|
|
|
|
|
|
def fmt_color(s: str, color: str, *, color_on: bool = True) -> str:
|
|
return f"{color}{s}{RESET}" if color_on else s
|
|
|
|
|
|
def parse_binary_frame(data: bytes) -> dict | None:
|
|
if len(data) < 8:
|
|
return None
|
|
type_code = struct.unpack(">I", data[0:4])[0]
|
|
if type_code == BINARY_PREVIEW_IMAGE:
|
|
image_type = struct.unpack(">I", data[4:8])[0]
|
|
ext = "jpg" if image_type == IMAGE_TYPE_JPEG else "png" if image_type == IMAGE_TYPE_PNG else "bin"
|
|
return {
|
|
"kind": "preview",
|
|
"image_type": image_type,
|
|
"ext": ext,
|
|
"image_bytes": data[8:],
|
|
}
|
|
if type_code == BINARY_PREVIEW_IMAGE_WITH_METADATA:
|
|
if len(data) < 12:
|
|
return None
|
|
meta_len = struct.unpack(">I", data[4:8])[0]
|
|
meta_end = 8 + meta_len
|
|
if len(data) < meta_end:
|
|
return None
|
|
try:
|
|
meta = json.loads(data[8:meta_end].decode("utf-8"))
|
|
except Exception:
|
|
meta = {"raw": data[8:meta_end][:200].decode("utf-8", "replace")}
|
|
return {
|
|
"kind": "preview_with_metadata",
|
|
"metadata": meta,
|
|
"image_bytes": data[meta_end:],
|
|
"ext": "png",
|
|
}
|
|
if type_code == BINARY_TEXT:
|
|
if len(data) < 8:
|
|
return None
|
|
nid_len = struct.unpack(">I", data[4:8])[0]
|
|
nid_end = 8 + nid_len
|
|
if len(data) < nid_end:
|
|
return None
|
|
return {
|
|
"kind": "text",
|
|
"node_id": data[8:nid_end].decode("utf-8", "replace"),
|
|
"text": data[nid_end:].decode("utf-8", "replace"),
|
|
}
|
|
return {"kind": "unknown", "type_code": type_code, "size": len(data)}
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
p = argparse.ArgumentParser(description="Real-time ComfyUI WebSocket monitor")
|
|
p.add_argument("--host", default=DEFAULT_LOCAL_HOST, help="ComfyUI server URL")
|
|
p.add_argument("--api-key", help=f"API key for cloud (or set ${ENV_API_KEY} env var)")
|
|
p.add_argument("--client-id", default=None, help="Client ID (default: random UUID)")
|
|
p.add_argument("--prompt-id", default=None,
|
|
help="Filter to a specific prompt_id (default: all jobs)")
|
|
p.add_argument("--previews", default=None,
|
|
help="Directory to save in-progress preview frames")
|
|
p.add_argument("--no-color", action="store_true", help="Disable ANSI colour")
|
|
p.add_argument("--timeout", type=float, default=600.0,
|
|
help="Hard cap on monitor duration (default 600s)")
|
|
args = p.parse_args(argv)
|
|
|
|
try:
|
|
import websocket # type: ignore[import-not-found]
|
|
except ImportError:
|
|
print(json.dumps({
|
|
"error": "websocket-client not installed",
|
|
"install": "pip install websocket-client",
|
|
}))
|
|
return 1
|
|
|
|
api_key = resolve_api_key(args.api_key)
|
|
cloud = is_cloud_host(args.host)
|
|
client_id = args.client_id or new_client_id()
|
|
|
|
# Build WS URL preserving any base-path component (e.g. behind reverse proxy).
|
|
parsed = urlparse(args.host if "://" in args.host else f"http://{args.host}")
|
|
scheme = "wss" if parsed.scheme == "https" else "ws"
|
|
netloc = parsed.netloc
|
|
base_path = parsed.path.rstrip("/")
|
|
ws_url = f"{scheme}://{netloc}{base_path}/ws?clientId={client_id}"
|
|
if cloud and api_key:
|
|
ws_url += f"&token={api_key}"
|
|
|
|
color_on = not args.no_color and sys.stdout.isatty()
|
|
|
|
preview_dir = Path(args.previews).expanduser() if args.previews else None
|
|
if preview_dir:
|
|
preview_dir.mkdir(parents=True, exist_ok=True)
|
|
log(f"Saving previews to {preview_dir}")
|
|
|
|
log(f"Connecting to {ws_url} (client_id={client_id})")
|
|
if args.prompt_id:
|
|
log(f"Filtering messages to prompt_id={args.prompt_id}")
|
|
|
|
ws = websocket.create_connection(ws_url, timeout=args.timeout)
|
|
ws.settimeout(args.timeout)
|
|
|
|
preview_counter = 0
|
|
try:
|
|
while True:
|
|
try:
|
|
msg = ws.recv()
|
|
except websocket.WebSocketTimeoutException:
|
|
log(f"Idle for {args.timeout}s — exiting")
|
|
return 0
|
|
if isinstance(msg, bytes):
|
|
parsed = parse_binary_frame(msg)
|
|
if parsed is None:
|
|
continue
|
|
if parsed["kind"] in ("preview", "preview_with_metadata") and preview_dir:
|
|
img_bytes = parsed.get("image_bytes", b"")
|
|
if img_bytes:
|
|
ext = parsed.get("ext", "png")
|
|
out = preview_dir / f"preview_{preview_counter:05d}.{ext}"
|
|
out.write_bytes(img_bytes)
|
|
preview_counter += 1
|
|
log(f" [preview] saved {out.name} ({len(img_bytes)} bytes)")
|
|
continue
|
|
|
|
try:
|
|
payload = json.loads(msg)
|
|
except Exception:
|
|
continue
|
|
mtype = payload.get("type", "")
|
|
mdata = payload.get("data", {}) or {}
|
|
pid = mdata.get("prompt_id")
|
|
|
|
if args.prompt_id and pid and pid != args.prompt_id:
|
|
continue
|
|
|
|
if mtype == "status":
|
|
qr = mdata.get("status", {}).get("exec_info", {}).get("queue_remaining", "?")
|
|
print(fmt_color(f"[status] queue_remaining={qr}", DIM, color_on=color_on))
|
|
elif mtype == "execution_start":
|
|
print(fmt_color(f"[start] prompt_id={pid}", BOLD, color_on=color_on))
|
|
elif mtype == "executing":
|
|
node = mdata.get("node")
|
|
if node:
|
|
print(fmt_color(f" [executing] node={node}", CYAN, color_on=color_on))
|
|
else:
|
|
print(fmt_color(f" [executing] (workflow done) prompt_id={pid}", DIM, color_on=color_on))
|
|
elif mtype == "progress":
|
|
v, m = mdata.get("value", 0), mdata.get("max", 0)
|
|
pct = (v / m * 100) if m else 0
|
|
print(f" [progress] {v}/{m} ({pct:5.1f}%) node={mdata.get('node')}")
|
|
elif mtype == "progress_state":
|
|
# Newer extended progress message
|
|
nodes = mdata.get("nodes") or {}
|
|
running = [k for k, v in nodes.items() if v.get("running")]
|
|
if running:
|
|
print(fmt_color(f" [progress_state] running={running}", DIM, color_on=color_on))
|
|
elif mtype == "executed":
|
|
node = mdata.get("node")
|
|
out = mdata.get("output") or {}
|
|
summary_parts = []
|
|
for key in ("images", "video", "videos", "gifs", "audio", "files"):
|
|
if out.get(key):
|
|
summary_parts.append(f"{key}={len(out[key])}")
|
|
summary = ", ".join(summary_parts) if summary_parts else "(no files)"
|
|
print(fmt_color(f" [executed] node={node} {summary}", GREEN, color_on=color_on))
|
|
elif mtype == "execution_cached":
|
|
cached = mdata.get("nodes") or []
|
|
if cached:
|
|
print(fmt_color(f" [cached] {len(cached)} nodes skipped", DIM, color_on=color_on))
|
|
elif mtype == "execution_success":
|
|
print(fmt_color(f"[success] prompt_id={pid}", GREEN + BOLD, color_on=color_on))
|
|
if args.prompt_id:
|
|
return 0
|
|
elif mtype == "execution_error":
|
|
exc_type = mdata.get("exception_type", "?")
|
|
exc_msg = mdata.get("exception_message", "?")
|
|
print(fmt_color(f"[error] {exc_type}: {exc_msg}", RED + BOLD, color_on=color_on))
|
|
tb = mdata.get("traceback")
|
|
if tb:
|
|
if isinstance(tb, list):
|
|
for line in tb:
|
|
print(fmt_color(f" {line}", RED, color_on=color_on))
|
|
else:
|
|
print(fmt_color(f" {tb}", RED, color_on=color_on))
|
|
if args.prompt_id:
|
|
return 1
|
|
elif mtype == "execution_interrupted":
|
|
print(fmt_color(f"[interrupted] prompt_id={pid}", YELLOW, color_on=color_on))
|
|
if args.prompt_id:
|
|
return 1
|
|
elif mtype == "notification":
|
|
v = mdata.get("value", "")
|
|
print(fmt_color(f"[notification] {v}", DIM, color_on=color_on))
|
|
else:
|
|
# Unknown / lightly-used types: print compactly
|
|
print(fmt_color(f"[{mtype}] {json.dumps(mdata, default=str)[:200]}", DIM, color_on=color_on))
|
|
|
|
except KeyboardInterrupt:
|
|
log("Interrupted")
|
|
return 130
|
|
finally:
|
|
try:
|
|
ws.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|