hermes-agent/optional-skills/creative/comfyui/scripts/run_workflow.py
alt-glitch b81638d749 feat(comfyui): rewrite skill — official CLI + REST API, no third-party dependency
Complete rewrite of the ComfyUI skill to use:
- comfy-cli (official, Comfy-Org/comfy-cli) for lifecycle management:
  install, launch, stop, node management, model downloads
- Direct REST API + helper scripts for workflow execution:
  parameter injection, submission, monitoring, output download
- No dependency on comfyui-skill-cli or any unofficial tool

New files:
- SKILL.md: full rewrite with two-layer architecture, decision tree, pitfalls
- references/official-cli.md: complete comfy-cli command reference
- references/rest-api.md: all REST endpoints (local + cloud)
- references/workflow-format.md: API format spec, common nodes, param mapping
- scripts/extract_schema.py: analyze workflow → extract controllable params
- scripts/run_workflow.py: inject args, submit, poll, download outputs
- scripts/check_deps.py: check missing nodes/models against running server
- scripts/comfyui_setup.sh: full setup automation with official CLI

Removed:
- references/cli-reference.md (was for unofficial comfyui-skill-cli)
- references/api-notes.md (replaced by rest-api.md)

Addresses feedback from PR #17316 comment:
- Correct author attribution
- Remove references to unofficial OpenClaw project
- License field reflects hermes-agent repo (MIT)
2026-04-29 12:38:59 -07:00

352 lines
14 KiB
Python

#!/usr/bin/env python3
"""
run_workflow.py — Inject parameters into a ComfyUI workflow, submit it, monitor execution,
and download outputs.
Usage:
# Local server
python3 run_workflow.py --workflow workflow_api.json \
--args '{"prompt": "a cat", "seed": 42}' \
--output-dir ./outputs
# Cloud server
python3 run_workflow.py --workflow workflow_api.json \
--args '{"prompt": "a cat"}' \
--host https://cloud.comfy.org \
--api-key comfyui-xxxxxxx \
--output-dir ./outputs
# With schema file (pre-extracted)
python3 run_workflow.py --workflow workflow_api.json \
--schema schema.json \
--args '{"prompt": "a cat"}' \
--output-dir ./outputs
Requires: Python 3.10+, requests (or urllib as fallback)
"""
import json
import sys
import time
import uuid
import copy
import argparse
from pathlib import Path
from urllib.parse import urljoin, urlencode
try:
import requests
HAS_REQUESTS = True
except ImportError:
HAS_REQUESTS = False
import urllib.request
import urllib.error
def http_get(url: str, headers: dict = None, follow_redirects: bool = True) -> tuple:
"""GET request, returns (status_code, body_bytes, response_headers)."""
if HAS_REQUESTS:
r = requests.get(url, headers=headers or {}, allow_redirects=follow_redirects, timeout=30)
return r.status_code, r.content, dict(r.headers)
else:
req = urllib.request.Request(url, headers=headers or {})
try:
resp = urllib.request.urlopen(req, timeout=30)
return resp.status, resp.read(), dict(resp.headers)
except urllib.error.HTTPError as e:
return e.code, e.read(), dict(e.headers)
def http_post(url: str, data: dict, headers: dict = None) -> tuple:
"""POST JSON request, returns (status_code, response_dict)."""
payload = json.dumps(data).encode()
hdrs = {"Content-Type": "application/json"}
if headers:
hdrs.update(headers)
if HAS_REQUESTS:
r = requests.post(url, json=data, headers=hdrs, timeout=30)
try:
return r.status_code, r.json()
except Exception:
return r.status_code, {"raw": r.text}
else:
req = urllib.request.Request(url, data=payload, headers=hdrs, method="POST")
try:
resp = urllib.request.urlopen(req, timeout=30)
return resp.status, json.loads(resp.read())
except urllib.error.HTTPError as e:
return e.code, json.loads(e.read())
class ComfyRunner:
def __init__(self, host: str = "http://127.0.0.1:8188", api_key: str = None):
self.host = host.rstrip("/")
self.api_key = api_key
self.is_cloud = "cloud.comfy.org" in self.host or api_key is not None
self.client_id = str(uuid.uuid4())
@property
def headers(self) -> dict:
h = {}
if self.api_key:
h["X-API-Key"] = self.api_key
return h
def api_url(self, path: str) -> str:
"""Build URL. Cloud uses /api prefix for some endpoints."""
if self.is_cloud and not path.startswith("/api"):
# Cloud endpoints: /api/prompt, /api/view, /api/job, /api/queue
return f"{self.host}/api{path}"
return f"{self.host}{path}"
def check_server(self) -> bool:
"""Check if server is reachable."""
try:
url = self.api_url("/system_stats") if not self.is_cloud else f"{self.host}/api/system_stats"
status, _, _ = http_get(url, self.headers)
return status == 200
except Exception:
return False
def submit(self, workflow: dict) -> dict:
"""Submit workflow for execution. Returns {prompt_id, node_errors}."""
payload = {"prompt": workflow, "client_id": self.client_id}
if self.api_key and self.is_cloud:
payload.setdefault("extra_data", {})["api_key_comfy_org"] = self.api_key
url = self.api_url("/prompt")
status, resp = http_post(url, payload, self.headers)
if status != 200:
return {"error": f"HTTP {status}", "details": resp}
return resp
def poll_status(self, prompt_id: str, timeout: int = 120) -> dict:
"""Poll until job completes. Returns final status dict."""
start = time.time()
poll_interval = 2.0
while time.time() - start < timeout:
if self.is_cloud:
# Cloud has a dedicated status endpoint
url = f"{self.host}/api/job/{prompt_id}/status"
status, body, _ = http_get(url, self.headers)
if status == 200:
data = json.loads(body) if isinstance(body, bytes) else body
job_status = data.get("status", "unknown")
if job_status == "completed":
return {"status": "success", "data": data}
elif job_status == "failed":
return {"status": "error", "data": data}
elif job_status == "cancelled":
return {"status": "cancelled", "data": data}
# still running, continue polling
else:
# Local: check /history/{prompt_id}
url = f"{self.host}/history/{prompt_id}"
status, body, _ = http_get(url, self.headers)
if status == 200:
data = json.loads(body) if isinstance(body, bytes) else body
if prompt_id in data:
entry = data[prompt_id]
if entry.get("status", {}).get("completed", False):
return {"status": "success", "outputs": entry.get("outputs", {})}
if entry.get("status", {}).get("status_str") == "error":
return {"status": "error", "data": entry}
time.sleep(poll_interval)
poll_interval = min(poll_interval * 1.2, 10.0)
return {"status": "timeout", "elapsed": time.time() - start}
def get_outputs(self, prompt_id: str) -> dict:
"""Get output file info from history."""
if self.is_cloud:
url = f"{self.host}/api/job/{prompt_id}/status"
else:
url = f"{self.host}/history/{prompt_id}"
status, body, _ = http_get(url, self.headers)
if status != 200:
return {}
data = json.loads(body) if isinstance(body, bytes) else body
if self.is_cloud:
return data.get("outputs", {})
if prompt_id in data:
return data[prompt_id].get("outputs", {})
return {}
def download_output(self, filename: str, subfolder: str, file_type: str, output_dir: Path) -> Path:
"""Download a single output file."""
params = urlencode({"filename": filename, "subfolder": subfolder, "type": file_type})
url = self.api_url(f"/view?{params}")
status, body, _ = http_get(url, self.headers, follow_redirects=True)
if status != 200:
raise RuntimeError(f"Failed to download {filename}: HTTP {status}")
out_path = output_dir / filename
out_path.write_bytes(body)
return out_path
def load_schema(schema_path: str = None, workflow: dict = None) -> dict:
"""Load or generate parameter schema."""
if schema_path:
with open(schema_path) as f:
return json.load(f)
# Inline extraction (same logic as extract_schema.py but simplified)
if workflow is None:
return {"parameters": {}}
# Import from sibling script
script_dir = Path(__file__).parent
sys.path.insert(0, str(script_dir))
from extract_schema import extract_schema
return extract_schema(workflow)
def inject_params(workflow: dict, schema: dict, args: dict) -> dict:
"""Inject user parameters into workflow based on schema mapping."""
wf = copy.deepcopy(workflow)
params = schema.get("parameters", {})
for param_name, value in args.items():
if param_name not in params:
print(f"Warning: unknown parameter '{param_name}', skipping", file=sys.stderr)
continue
mapping = params[param_name]
node_id = mapping["node_id"]
field = mapping["field"]
if node_id in wf and "inputs" in wf[node_id]:
wf[node_id]["inputs"][field] = value
else:
print(f"Warning: node {node_id} not found in workflow", file=sys.stderr)
return wf
def main():
parser = argparse.ArgumentParser(description="Run a ComfyUI workflow with parameter injection")
parser.add_argument("--workflow", required=True, help="Path to workflow API JSON file")
parser.add_argument("--args", default="{}", help="JSON parameters to inject")
parser.add_argument("--schema", help="Path to schema JSON (from extract_schema.py). Auto-generated if omitted.")
parser.add_argument("--host", default="http://127.0.0.1:8188", help="ComfyUI server URL")
parser.add_argument("--api-key", help="API key for cloud (X-API-Key)")
parser.add_argument("--output-dir", default="./outputs", help="Directory to save outputs")
parser.add_argument("--timeout", type=int, default=120, help="Max seconds to wait for completion")
parser.add_argument("--no-download", action="store_true", help="Skip downloading outputs")
parser.add_argument("--submit-only", action="store_true", help="Submit and return prompt_id without waiting")
args = parser.parse_args()
# Load workflow
workflow_path = Path(args.workflow)
if not workflow_path.exists():
print(json.dumps({"error": f"Workflow file not found: {args.workflow}"}))
sys.exit(1)
with open(workflow_path) as f:
workflow = json.load(f)
# Validate format
if "nodes" in workflow and "links" in workflow:
print(json.dumps({"error": "Workflow is in editor format, not API format. Re-export with 'Save (API Format)'."}))
sys.exit(1)
# Parse user args
try:
user_args = json.loads(args.args)
except json.JSONDecodeError as e:
print(json.dumps({"error": f"Invalid --args JSON: {e}"}))
sys.exit(1)
# Load/generate schema and inject params
schema = load_schema(args.schema, workflow)
if user_args:
workflow = inject_params(workflow, schema, user_args)
# Connect to server
runner = ComfyRunner(host=args.host, api_key=args.api_key)
# Check server
if not runner.check_server():
print(json.dumps({"error": f"Cannot reach server at {args.host}. Is ComfyUI running?"}))
sys.exit(1)
# Submit
result = runner.submit(workflow)
if "error" in result:
print(json.dumps({"error": "Submission failed", "details": result}))
sys.exit(1)
prompt_id = result.get("prompt_id")
if not prompt_id:
print(json.dumps({"error": "No prompt_id in response", "response": result}))
sys.exit(1)
# Check for node errors
node_errors = result.get("node_errors", {})
if node_errors:
print(json.dumps({"error": "Workflow validation failed", "node_errors": node_errors}))
sys.exit(1)
if args.submit_only:
print(json.dumps({"status": "submitted", "prompt_id": prompt_id}))
sys.exit(0)
# Poll for completion
print(f"Submitted: {prompt_id}. Waiting...", file=sys.stderr)
poll_result = runner.poll_status(prompt_id, timeout=args.timeout)
if poll_result["status"] == "timeout":
print(json.dumps({"status": "timeout", "prompt_id": prompt_id, "elapsed": poll_result["elapsed"]}))
sys.exit(1)
elif poll_result["status"] == "error":
print(json.dumps({"status": "error", "prompt_id": prompt_id, "details": poll_result.get("data")}))
sys.exit(1)
elif poll_result["status"] == "cancelled":
print(json.dumps({"status": "cancelled", "prompt_id": prompt_id}))
sys.exit(1)
# Download outputs
outputs = poll_result.get("outputs") or runner.get_outputs(prompt_id)
if args.no_download:
print(json.dumps({"status": "success", "prompt_id": prompt_id, "outputs": outputs}))
sys.exit(0)
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
downloaded = []
for node_id, node_output in outputs.items():
# ComfyUI puts images/videos under "images" key (even for video)
for key in ("images", "gifs", "videos", "audio"):
if key not in node_output:
continue
for file_info in node_output[key]:
filename = file_info.get("filename", "")
subfolder = file_info.get("subfolder", "")
file_type = file_info.get("type", "output")
if not filename:
continue
try:
out_path = runner.download_output(filename, subfolder, file_type, output_dir)
# Detect media type from extension
ext = Path(filename).suffix.lower()
if ext in (".mp4", ".webm", ".avi", ".mov", ".gif"):
media_type = "video"
elif ext in (".wav", ".mp3", ".flac", ".ogg"):
media_type = "audio"
else:
media_type = "image"
downloaded.append({
"file": str(out_path),
"node_id": node_id,
"type": media_type,
"filename": filename,
})
except Exception as e:
print(f"Warning: failed to download {filename}: {e}", file=sys.stderr)
print(json.dumps({
"status": "success",
"prompt_id": prompt_id,
"outputs": downloaded,
}, indent=2))
if __name__ == "__main__":
main()