hermes-agent/skills/creative/comfyui/scripts/fetch_logs.py
SHL0MS a7780fe05f fix(skills/comfyui): bug fixes, cloud parity, expanded coverage, examples, tests
The audit of v4.1 surfaced ~70 issues across the five scripts and three
reference docs — most user-visible (silent file overwrites, status-error
misclassified as success, X-API-Key leaked to S3 on /api/view redirect,
Cloud endpoints that 404 because they were renamed). v5.0.0 fixes those
and fills the gaps that previously forced users to write their own glue
(WebSocket monitoring, batch/sweep, img2img upload helper, dep auto-fix,
log fetch, health check, example workflows).

Critical fixes
- run_workflow.py: poll_status now checks status_str==error BEFORE
  completed:true, so a failed run no longer reports success
- run_workflow.py: download_output streams to disk via safe_path_join,
  preserves server subfolder structure (no silent overwrites), and
  retries with exponential backoff
- run_workflow.py: refuses to overwrite a link with a literal in
  inject_params (would silently break wiring)
- _common.py: _StripSensitiveOnRedirectSession (subclasses
  requests.Session.rebuild_auth) drops X-API-Key/Cookie on cross-host
  redirects — fixes a real key-leak path through Cloud's signed-URL
  download flow. Tested
- Cloud routing (verified live): /history → /history_v2,
  /models/<f> → /experiment/models/<f>, plus folder aliases for the
  unet ↔ diffusion_models and clip ↔ text_encoders rename
- check_deps.py: distinguishes 200/empty vs 404 folder_not_found vs
  403 free-tier; emits concrete fix_command per missing dep
- extract_schema.py: prompt vs negative_prompt determined by tracing
  KSampler.{positive,negative} connections (incl. through Reroute /
  Primitive nodes) instead of meta-title heuristic; symmetric
  duplicate-name resolution; cycle-safe trace_to_node
- hardware_check.py: multi-GPU pick-best, Apple variant detection,
  Rosetta detection, WSL2, ROCm --json, disk-space check, optional
  PyTorch probe; powershell preferred over deprecated wmic
- comfyui_setup.sh: prefers pipx → uvx → pip --user (with PEP-668
  fallback); idempotent — skips relaunch if server already up;
  configurable port/workspace; persistent log; SIGINT trap

New scripts
- run_batch.py — count or sweep (cartesian product), parallel up to
  cloud tier limit
- ws_monitor.py — real-time WebSocket viewer; saves preview frames
- auto_fix_deps.py — runs comfy node install / model download for
  whatever check_deps reports missing (with --dry-run)
- health_check.py — single command that runs the verification checklist
  (comfy-cli + server + checkpoints + optional smoke test that cancels
  itself to avoid burning compute)
- fetch_logs.py — pull traceback / status messages for a prompt_id

Coverage expansion
- Param patterns now cover Flux (BasicScheduler, BasicGuider,
  RandomNoise, ModelSamplingFlux), SD3, Wan/Hunyuan/LTX video,
  IPAdapter, rgthree, easy-use, AnimateDiff
- Embedding refs in CLIPTextEncode strings extracted as model deps
- ckpt_name / vae_name / lora_name / unet_name now controllable so
  workflows can be retargeted per run

Examples
- workflows/{sd15,sdxl,flux_dev}_txt2img.json
- workflows/sdxl_{img2img,inpaint}.json
- workflows/upscale_4x.json
- workflows/{animatediff_video,wan_video_t2v}.json + README

Tests
- 117 tests (105 unit + 8 cloud integration + 4 cross-host security)
- Cloud tests auto-skip without COMFY_CLOUD_API_KEY; verified end-to-end
  against live cloud API

Backwards compatibility
- All existing CLI flags continue to work; new behavior is opt-in
  (--ws, --input-image, --randomize-seed, --flat-output, etc.)
2026-04-29 20:48:01 -07:00

158 lines
5.6 KiB
Python
Executable file

#!/usr/bin/env python3
"""
fetch_logs.py — Retrieve workflow execution diagnostics from a ComfyUI server.
When a workflow errors, the server's /history (local) or /jobs (cloud) entry
contains the full Python traceback. This script makes it easy to fetch by
prompt_id, with sensible formatting.
Usage:
python3 fetch_logs.py <prompt_id>
python3 fetch_logs.py <prompt_id> --host https://cloud.comfy.org
python3 fetch_logs.py --tail-queue # show currently queued/running jobs
"""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent))
from _common import ( # noqa: E402
DEFAULT_LOCAL_HOST, ENV_API_KEY, emit_json, http_get, is_cloud_host,
resolve_api_key, resolve_url,
)
def fetch_history_entry(host: str, headers: dict, prompt_id: str, *, is_cloud: bool) -> dict:
if is_cloud:
# Try /jobs/{id} first
url = resolve_url(host, f"/jobs/{prompt_id}", is_cloud=True)
r = http_get(url, headers=headers, retries=2, timeout=30)
if r.status == 200:
try:
return {"ok": True, "entry": r.json(), "source": "/api/jobs"}
except Exception:
pass
# Fallback to history_v2
url = resolve_url(host, f"/history/{prompt_id}", is_cloud=True)
r = http_get(url, headers=headers, retries=2, timeout=30)
try:
data = r.json()
except Exception:
data = None
if r.status == 200 and data:
return {"ok": True, "entry": data, "source": "/api/history_v2"}
return {"ok": False, "http_status": r.status, "body": r.text()[:500]}
url = resolve_url(host, f"/history/{prompt_id}", is_cloud=False)
r = http_get(url, headers=headers, retries=2, timeout=30)
if r.status != 200:
return {"ok": False, "http_status": r.status, "body": r.text()[:500]}
try:
data = r.json()
except Exception:
return {"ok": False, "reason": "non-JSON response"}
if not isinstance(data, dict) or prompt_id not in data:
return {"ok": False, "reason": "prompt_id not found in history",
"history_keys": list(data.keys())[:5] if isinstance(data, dict) else []}
return {"ok": True, "entry": data[prompt_id], "source": "/history"}
def fetch_queue(host: str, headers: dict) -> dict:
url = resolve_url(host, "/queue")
r = http_get(url, headers=headers, retries=2, timeout=15)
try:
data = r.json()
except Exception:
data = {"raw": r.text()[:500]}
return {"http_status": r.status, "data": data}
def extract_diagnostics(entry: dict) -> dict:
"""Pull out the parts a human cares about: status, errors, traceback, timing."""
diag: dict = {}
status = entry.get("status") or {}
diag["status_str"] = status.get("status_str")
diag["completed"] = status.get("completed")
messages = status.get("messages") or []
diag["execution_log"] = []
for msg in messages:
if isinstance(msg, list) and len(msg) >= 2:
mtype, mdata = msg[0], msg[1]
diag["execution_log"].append({"type": mtype, "data": mdata})
else:
diag["execution_log"].append(msg)
# Look for execution_error inside messages
errors = []
for msg in messages:
if isinstance(msg, list) and len(msg) >= 2 and msg[0] == "execution_error":
errors.append(msg[1])
if errors:
diag["errors"] = errors
# Cloud's /jobs response shape: top-level outputs / status / etc.
if "outputs" in entry:
out = entry["outputs"] or {}
if isinstance(out, dict):
diag["output_node_ids"] = list(out.keys())
# Count file refs across all output buckets (images / video / etc.)
total = 0
for node_output in out.values():
if not isinstance(node_output, dict):
continue
for v in node_output.values():
if isinstance(v, list):
total += len(v)
diag["output_count"] = total
else:
diag["output_node_ids"] = []
diag["output_count"] = 0
return diag
def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser(description="Fetch workflow execution diagnostics")
p.add_argument("prompt_id", nargs="?", help="prompt_id to look up")
p.add_argument("--host", default=DEFAULT_LOCAL_HOST)
p.add_argument("--api-key", help=f"or set ${ENV_API_KEY}")
p.add_argument("--raw", action="store_true",
help="Print the full history entry instead of the digest")
p.add_argument("--tail-queue", action="store_true",
help="Show currently running/pending jobs instead")
args = p.parse_args(argv)
api_key = resolve_api_key(args.api_key)
headers = {"X-API-Key": api_key} if api_key else {}
is_cloud = is_cloud_host(args.host)
if args.tail_queue:
emit_json(fetch_queue(args.host, headers))
return 0
if not args.prompt_id:
print("Error: prompt_id is required (or use --tail-queue)", file=sys.stderr)
return 1
res = fetch_history_entry(args.host, headers, args.prompt_id, is_cloud=is_cloud)
if not res.get("ok"):
emit_json(res)
return 1
if args.raw:
emit_json(res)
return 0
diag = extract_diagnostics(res["entry"])
diag["source"] = res.get("source")
diag["prompt_id"] = args.prompt_id
emit_json(diag)
return 0 if diag.get("status_str") not in ("error",) else 1
if __name__ == "__main__":
sys.exit(main())