feat(skills): move comfyui from optional to built-in (#17631)

Intended placement per PR #17610 discussion — comfyui belongs in
skills/creative/ alongside other creative built-ins (touchdesigner-mcp,
pretext, sketch), not in optional-skills/.

Pure directory rename, no content changes. History preserved via git mv.
This commit is contained in:
Teknium 2026-04-29 14:09:17 -07:00 committed by GitHub
parent 456955c2e4
commit 4899bd99c0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 0 additions and 0 deletions

View file

@ -0,0 +1,182 @@
#!/usr/bin/env python3
"""
check_deps.py Check if a ComfyUI workflow's dependencies (custom nodes and models) are installed.
Queries the running ComfyUI server for installed nodes (via /object_info) and models
(via /models/{folder}), then diffs against what the workflow requires.
Usage:
python3 check_deps.py workflow_api.json
python3 check_deps.py workflow_api.json --host 127.0.0.1 --port 8188
python3 check_deps.py workflow_api.json --host https://cloud.comfy.org --api-key KEY
Output format:
{
"is_ready": true/false,
"missing_nodes": ["NodeClassName", ...],
"missing_models": [{"class_type": "...", "field": "...", "value": "...", "folder": "..."}],
"installed_nodes_count": 123,
"required_nodes": ["KSampler", "CLIPTextEncode", ...]
}
Requires: Python 3.10+, requests (or urllib as fallback)
"""
import json
import sys
import argparse
from pathlib import Path
from urllib.parse import urljoin, urlparse
try:
import requests
HAS_REQUESTS = True
except ImportError:
HAS_REQUESTS = False
import urllib.request
import urllib.error
# Known model loader node types and which folder they reference
MODEL_LOADERS = {
"CheckpointLoaderSimple": ("ckpt_name", "checkpoints"),
"CheckpointLoader": ("ckpt_name", "checkpoints"),
"unCLIPCheckpointLoader": ("ckpt_name", "checkpoints"),
"LoraLoader": ("lora_name", "loras"),
"LoraLoaderModelOnly": ("lora_name", "loras"),
"VAELoader": ("vae_name", "vae"),
"ControlNetLoader": ("control_net_name", "controlnet"),
"DiffControlNetLoader": ("control_net_name", "controlnet"),
"CLIPLoader": ("clip_name", "clip"),
"DualCLIPLoader": ("clip_name1", "clip"),
"UNETLoader": ("unet_name", "unet"),
"DiffusionModelLoader": ("model_name", "diffusion_models"),
"UpscaleModelLoader": ("model_name", "upscale_models"),
"CLIPVisionLoader": ("clip_name", "clip_vision"),
"StyleModelLoader": ("style_model_name", "style_models"),
"GLIGENLoader": ("gligen_name", "gligen"),
"HypernetworkLoader": ("hypernetwork_name", "hypernetworks"),
}
def http_get(url: str, headers: dict = None) -> tuple:
"""GET request, returns (status_code, body_text)."""
if HAS_REQUESTS:
r = requests.get(url, headers=headers or {}, timeout=30)
return r.status_code, r.text
else:
req = urllib.request.Request(url, headers=headers or {})
try:
resp = urllib.request.urlopen(req, timeout=30)
return resp.status, resp.read().decode()
except urllib.error.HTTPError as e:
return e.code, e.read().decode()
def check_deps(workflow_path: str, host: str = "http://127.0.0.1:8188", api_key: str = None):
"""Check workflow dependencies against a running server."""
# Load workflow
with open(workflow_path) as f:
workflow = json.load(f)
# Validate format
if "nodes" in workflow and "links" in workflow:
return {"error": "Workflow is in editor format, not API format."}
headers = {}
if api_key:
headers["X-API-Key"] = api_key
parsed_host = urlparse(host)
hostname = (parsed_host.hostname or "").lower()
is_cloud_host = hostname == "cloud.comfy.org" or hostname.endswith(".cloud.comfy.org")
is_cloud = is_cloud_host or api_key is not None
base = host.rstrip("/")
# Get installed node types
object_info_url = f"{base}/api/object_info" if is_cloud else f"{base}/object_info"
status, body = http_get(object_info_url, headers)
if status != 200:
return {"error": f"Cannot reach server at {host}. Is ComfyUI running? HTTP {status}"}
installed_nodes = set(json.loads(body).keys())
# Find required node types from workflow
required_nodes = set()
for node_id, node in workflow.items():
if isinstance(node, dict) and "class_type" in node:
required_nodes.add(node["class_type"])
missing_nodes = sorted(required_nodes - installed_nodes)
# Check model dependencies
missing_models = []
model_cache = {} # folder → set of installed model filenames
for node_id, node in workflow.items():
if not isinstance(node, dict) or "class_type" not in node:
continue
class_type = node["class_type"]
if class_type not in MODEL_LOADERS:
continue
field, folder = MODEL_LOADERS[class_type]
inputs = node.get("inputs", {})
model_name = inputs.get(field)
if not model_name or not isinstance(model_name, str):
continue
# Fetch installed models for this folder (cached)
if folder not in model_cache:
models_url = f"{base}/api/models/{folder}" if is_cloud else f"{base}/models/{folder}"
s, b = http_get(models_url, headers)
if s == 200:
model_cache[folder] = set(json.loads(b))
else:
model_cache[folder] = set()
if model_name not in model_cache[folder]:
missing_models.append({
"node_id": node_id,
"class_type": class_type,
"field": field,
"value": model_name,
"folder": folder,
})
is_ready = len(missing_nodes) == 0 and len(missing_models) == 0
return {
"is_ready": is_ready,
"missing_nodes": missing_nodes,
"missing_models": missing_models,
"installed_nodes_count": len(installed_nodes),
"required_nodes": sorted(required_nodes),
}
def main():
parser = argparse.ArgumentParser(description="Check ComfyUI workflow dependencies")
parser.add_argument("workflow", help="Path to workflow API JSON file")
parser.add_argument("--host", default="http://127.0.0.1:8188", help="ComfyUI server URL")
parser.add_argument("--port", type=int, help="Server port (overrides --host port)")
parser.add_argument("--api-key", help="API key for cloud")
args = parser.parse_args()
# Handle --port override
host = args.host
if args.port and ":" not in host.split("//")[-1]:
host = f"{host}:{args.port}"
result = check_deps(args.workflow, host=host, api_key=args.api_key)
print(json.dumps(result, indent=2))
if result.get("error"):
sys.exit(1)
if not result.get("is_ready", False):
sys.exit(1)
sys.exit(0)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,131 @@
#!/usr/bin/env bash
# ComfyUI Setup — Install, launch, and verify using the official comfy-cli.
# Usage: bash scripts/comfyui_setup.sh [--nvidia|--amd|--m-series|--cpu]
#
# If no flag is passed, runs hardware_check.py to detect the right one
# automatically, and refuses to install locally when the verdict is "cloud"
# (no usable GPU, too little VRAM, Intel Mac, etc.) — pointing the user
# at Comfy Cloud instead.
#
# Prerequisites: Python 3.10+, pip
# What it does:
# 0. Hardware check (skipped if a flag was passed explicitly)
# 1. Installs comfy-cli (if not present)
# 2. Disables analytics tracking
# 3. Installs ComfyUI + ComfyUI-Manager
# 4. Launches server in background
# 5. Verifies server is reachable
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
HARDWARE_CHECK="$SCRIPT_DIR/hardware_check.py"
# Step 0: Hardware check (auto-detect GPU flag when none was provided)
if [ $# -ge 1 ]; then
GPU_FLAG="$1"
echo "==> GPU flag: $GPU_FLAG (user-supplied, skipping hardware check)"
else
if [ ! -f "$HARDWARE_CHECK" ]; then
echo "==> hardware_check.py not found, defaulting to --nvidia"
GPU_FLAG="--nvidia"
else
echo "==> Running hardware check..."
set +e
HW_JSON="$(python3 "$HARDWARE_CHECK" --json)"
HW_EXIT=$?
set -e
echo "$HW_JSON"
echo ""
VERDICT="$(echo "$HW_JSON" | python3 -c 'import sys,json; print(json.load(sys.stdin).get("verdict",""))')"
FLAG="$(echo "$HW_JSON" | python3 -c 'import sys,json; print(json.load(sys.stdin).get("comfy_cli_flag") or "")')"
if [ "$VERDICT" = "cloud" ]; then
echo ""
echo "==> Hardware check: this machine is not suitable for local ComfyUI."
echo " Recommended: Comfy Cloud — https://platform.comfy.org"
echo ""
echo " If you want to override and install anyway, re-run with an"
echo " explicit flag: bash $0 --nvidia|--amd|--m-series|--cpu"
exit 2
fi
if [ -z "$FLAG" ]; then
echo "==> Hardware check couldn't pick a comfy-cli flag. Defaulting to --nvidia."
echo " (For Intel Arc or unsupported hardware, use the manual install path.)"
GPU_FLAG="--nvidia"
else
GPU_FLAG="$FLAG"
fi
if [ "$VERDICT" = "marginal" ]; then
echo "==> Hardware check: verdict is MARGINAL."
echo " SD1.5 should work; SDXL/Flux may be slow or OOM."
echo " Consider Comfy Cloud for heavier workflows: https://platform.comfy.org"
echo ""
fi
fi
fi
echo "==> ComfyUI Setup"
echo " GPU flag: $GPU_FLAG"
echo ""
# Step 1: Install comfy-cli
if command -v comfy >/dev/null 2>&1; then
echo "==> comfy-cli already installed: $(comfy -v 2>/dev/null || echo 'unknown version')"
else
echo "==> Installing comfy-cli..."
pip install comfy-cli
fi
# Step 2: Disable tracking (avoid interactive prompt)
echo "==> Disabling analytics tracking..."
comfy --skip-prompt tracking disable 2>/dev/null || true
# Step 3: Install ComfyUI
if comfy which 2>/dev/null | grep -q "ComfyUI"; then
echo "==> ComfyUI already installed at: $(comfy which 2>/dev/null)"
else
echo "==> Installing ComfyUI ($GPU_FLAG)..."
comfy --skip-prompt install $GPU_FLAG
fi
# Step 4: Launch in background
echo "==> Launching ComfyUI in background..."
comfy launch --background 2>/dev/null || {
echo "==> Background launch failed. Trying foreground check..."
echo " You may need to run: comfy launch"
exit 1
}
# Step 5: Wait for server to be ready
echo "==> Waiting for server..."
MAX_WAIT=30
ELAPSED=0
while [ $ELAPSED -lt $MAX_WAIT ]; do
if curl -s http://127.0.0.1:8188/system_stats >/dev/null 2>&1; then
echo "==> Server is running!"
curl -s http://127.0.0.1:8188/system_stats | python3 -m json.tool 2>/dev/null || true
break
fi
sleep 2
ELAPSED=$((ELAPSED + 2))
done
if [ $ELAPSED -ge $MAX_WAIT ]; then
echo "==> Server did not start within ${MAX_WAIT}s."
echo " Check logs with: comfy launch (foreground) to see errors."
exit 1
fi
echo ""
echo "==> Setup complete!"
echo " Server: http://127.0.0.1:8188"
echo " Web UI: http://127.0.0.1:8188 (open in browser)"
echo " Stop: comfy stop"
echo ""
echo " Next steps:"
echo " - Download a model: comfy model download --url <URL> --relative-path models/checkpoints"
echo " - Run a workflow: python3 scripts/run_workflow.py --workflow <file.json> --args '{...}'"

View file

@ -0,0 +1,212 @@
#!/usr/bin/env python3
"""
extract_schema.py Analyze a ComfyUI API-format workflow and extract controllable parameters.
Reads a workflow JSON, identifies user-facing parameters (prompts, seed, dimensions, etc.)
by scanning node types and field names, and outputs a schema mapping.
Usage:
python3 extract_schema.py workflow_api.json
python3 extract_schema.py workflow_api.json --output schema.json
Output format:
{
"parameters": {
"prompt": {"node_id": "6", "field": "text", "type": "string", "value": "..."},
"seed": {"node_id": "3", "field": "seed", "type": "int", "value": 42},
...
},
"output_nodes": ["9"],
"model_dependencies": [
{"node_id": "4", "class_type": "CheckpointLoaderSimple", "field": "ckpt_name", "value": "..."}
]
}
Requires: Python 3.10+ (stdlib only)
"""
import json
import sys
import argparse
from pathlib import Path
# Known parameter patterns: (class_type, field_name) → friendly_name
PARAM_PATTERNS = [
# Prompts
("CLIPTextEncode", "text", "prompt"),
("CLIPTextEncodeSDXL", "text_g", "prompt"),
("CLIPTextEncodeSDXL", "text_l", "prompt_l"),
# Sampling
("KSampler", "seed", "seed"),
("KSampler", "steps", "steps"),
("KSampler", "cfg", "cfg"),
("KSampler", "sampler_name", "sampler_name"),
("KSampler", "scheduler", "scheduler"),
("KSampler", "denoise", "denoise"),
("KSamplerAdvanced", "noise_seed", "seed"),
("KSamplerAdvanced", "steps", "steps"),
("KSamplerAdvanced", "cfg", "cfg"),
("KSamplerAdvanced", "sampler_name", "sampler_name"),
("KSamplerAdvanced", "scheduler", "scheduler"),
# Dimensions
("EmptyLatentImage", "width", "width"),
("EmptyLatentImage", "height", "height"),
("EmptyLatentImage", "batch_size", "batch_size"),
# Image input
("LoadImage", "image", "image"),
("LoadImageMask", "image", "mask_image"),
# LoRA
("LoraLoader", "lora_name", "lora_name"),
("LoraLoader", "strength_model", "lora_strength"),
# Output
("SaveImage", "filename_prefix", "filename_prefix"),
]
# Node types that produce output files
OUTPUT_NODES = {"SaveImage", "PreviewImage", "VHS_VideoCombine", "SaveAudio", "SaveAnimatedWEBP", "SaveAnimatedPNG"}
# Node types that load models (for dependency checking)
MODEL_LOADERS = {
"CheckpointLoaderSimple": ("ckpt_name", "checkpoints"),
"CheckpointLoader": ("ckpt_name", "checkpoints"),
"LoraLoader": ("lora_name", "loras"),
"LoraLoaderModelOnly": ("lora_name", "loras"),
"VAELoader": ("vae_name", "vae"),
"ControlNetLoader": ("control_net_name", "controlnet"),
"CLIPLoader": ("clip_name", "clip"),
"DualCLIPLoader": ("clip_name1", "clip"),
"UNETLoader": ("unet_name", "unet"),
"DiffusionModelLoader": ("model_name", "diffusion_models"),
"UpscaleModelLoader": ("model_name", "upscale_models"),
"CLIPVisionLoader": ("clip_name", "clip_vision"),
}
def validate_api_format(workflow: dict) -> bool:
"""Check if workflow is in API format (not editor format)."""
if "nodes" in workflow and "links" in workflow:
return False
# API format: top-level keys are node IDs, each has class_type
for node_id, node in workflow.items():
if isinstance(node, dict) and "class_type" in node:
return True
return False
def infer_type(value) -> str:
"""Infer JSON schema type from a Python value."""
if isinstance(value, bool):
return "bool"
if isinstance(value, int):
return "int"
if isinstance(value, float):
return "float"
if isinstance(value, str):
return "string"
if isinstance(value, list):
return "link" # connections to other nodes
return "unknown"
def extract_schema(workflow: dict) -> dict:
"""Extract controllable parameters from a workflow."""
parameters = {}
output_nodes = []
model_deps = []
name_counts = {} # track duplicate friendly names
for node_id, node in workflow.items():
if not isinstance(node, dict) or "class_type" not in node:
continue
class_type = node["class_type"]
inputs = node.get("inputs", {})
meta_title = node.get("_meta", {}).get("title", "")
# Check if this is an output node
if class_type in OUTPUT_NODES:
output_nodes.append(node_id)
# Check if this is a model loader
if class_type in MODEL_LOADERS:
field, folder = MODEL_LOADERS[class_type]
if field in inputs and isinstance(inputs[field], str):
model_deps.append({
"node_id": node_id,
"class_type": class_type,
"field": field,
"value": inputs[field],
"folder": folder,
})
# Extract controllable parameters
for pattern_class, pattern_field, friendly_name in PARAM_PATTERNS:
if class_type != pattern_class:
continue
if pattern_field not in inputs:
continue
value = inputs[pattern_field]
val_type = infer_type(value)
if val_type == "link":
continue # skip linked inputs — not directly controllable
# Disambiguate duplicate friendly names
# Use title hint for prompt fields
actual_name = friendly_name
if friendly_name == "prompt" and meta_title:
title_lower = meta_title.lower()
if "negative" in title_lower or "neg" in title_lower:
actual_name = "negative_prompt"
# Handle remaining duplicates by appending node_id
if actual_name in name_counts:
name_counts[actual_name] += 1
actual_name = f"{actual_name}_{node_id}"
else:
name_counts[actual_name] = 1
parameters[actual_name] = {
"node_id": node_id,
"field": pattern_field,
"type": val_type,
"value": value,
}
return {
"parameters": parameters,
"output_nodes": output_nodes,
"model_dependencies": model_deps,
}
def main():
parser = argparse.ArgumentParser(description="Extract controllable parameters from a ComfyUI workflow")
parser.add_argument("workflow", help="Path to workflow API JSON file")
parser.add_argument("--output", "-o", help="Output file (default: stdout)")
args = parser.parse_args()
workflow_path = Path(args.workflow)
if not workflow_path.exists():
print(f"Error: {workflow_path} not found", file=sys.stderr)
sys.exit(1)
with open(workflow_path) as f:
workflow = json.load(f)
if not validate_api_format(workflow):
print("Error: Workflow is in editor format, not API format.", file=sys.stderr)
print("Re-export from ComfyUI using 'Save (API Format)' button.", file=sys.stderr)
sys.exit(1)
schema = extract_schema(workflow)
output_json = json.dumps(schema, indent=2)
if args.output:
Path(args.output).write_text(output_json)
print(f"Schema written to {args.output}", file=sys.stderr)
else:
print(output_json)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,311 @@
#!/usr/bin/env python3
"""Detect whether this machine can realistically run ComfyUI locally.
Emits a structured JSON report the agent can read to decide whether to:
- help the user install ComfyUI locally, or
- steer them to Comfy Cloud instead.
Usage:
python3 hardware_check.py [--json]
Exit code:
0 "ok" can run local ComfyUI at reasonable speed
1 "marginal" technically works but slow / memory-tight
2 "cloud" local is not viable, recommend Comfy Cloud
The JSON report always prints to stdout regardless of exit code.
Output fields the agent should read:
verdict: "ok" | "marginal" | "cloud"
recommended_install_path: "nvidia" | "amd" | "apple-silicon" | "intel" | "comfy-cloud"
comfy_cli_flag: "--nvidia" | "--amd" | "--m-series" | None
(pass directly to `comfy install` when verdict != cloud)
gpu: detected GPU info or null
notes: list of human-readable strings to surface to the user
"""
from __future__ import annotations
import json
import os
import platform
import re
import shutil
import subprocess
import sys
# Rough thresholds. SDXL/Flux need real VRAM; SD1.5 will scrape by on 6GB.
# Apple Silicon shares RAM with GPU — unified memory budget is total RAM.
MIN_VRAM_GB_USABLE = 6 # below this, most modern models won't load
OK_VRAM_GB = 8 # SDXL fits comfortably here
GREAT_VRAM_GB = 12 # Flux / video models start being realistic
MIN_MAC_RAM_GB = 16 # Apple Silicon unified memory; below = pain
OK_MAC_RAM_GB = 32 # smooth for SDXL / most workflows
def _run(cmd: list[str], timeout: int = 5) -> str:
try:
out = subprocess.run(
cmd, capture_output=True, text=True, timeout=timeout, check=False
)
return (out.stdout or "") + (out.stderr or "")
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
return ""
def detect_nvidia() -> dict | None:
if not shutil.which("nvidia-smi"):
return None
out = _run([
"nvidia-smi",
"--query-gpu=name,memory.total,driver_version",
"--format=csv,noheader,nounits",
])
if not out.strip():
return None
first = out.strip().splitlines()[0]
parts = [p.strip() for p in first.split(",")]
if len(parts) < 2:
return None
name = parts[0]
try:
vram_mb = int(parts[1])
except ValueError:
vram_mb = 0
driver = parts[2] if len(parts) > 2 else ""
return {
"vendor": "nvidia",
"name": name,
"vram_gb": round(vram_mb / 1024, 1),
"driver": driver,
}
def detect_rocm() -> dict | None:
if not shutil.which("rocm-smi"):
return None
out = _run(["rocm-smi", "--showproductname", "--showmeminfo", "vram"])
if not out.strip():
return None
name_m = re.search(r"Card series:\s*(.+)", out)
vram_m = re.search(r"VRAM Total Memory \(B\):\s*(\d+)", out)
vram_gb = 0.0
if vram_m:
vram_gb = round(int(vram_m.group(1)) / (1024**3), 1)
return {
"vendor": "amd",
"name": name_m.group(1).strip() if name_m else "AMD GPU",
"vram_gb": vram_gb,
"driver": "rocm",
}
def detect_apple_silicon() -> dict | None:
if platform.system() != "Darwin":
return None
if platform.machine() != "arm64":
return None # Intel Mac — no usable MPS
chip = _run(["sysctl", "-n", "machdep.cpu.brand_string"]).strip()
# Examples: "Apple M1", "Apple M1 Pro", "Apple M2 Max", "Apple M3 Ultra"
m = re.search(r"Apple M(\d+)", chip)
generation = int(m.group(1)) if m else 1
mem_bytes = 0
try:
mem_bytes = int(_run(["sysctl", "-n", "hw.memsize"]).strip() or 0)
except ValueError:
pass
ram_gb = round(mem_bytes / (1024**3), 1) if mem_bytes else 0.0
return {
"vendor": "apple",
"name": chip or "Apple Silicon",
"generation": generation,
"unified_memory_gb": ram_gb,
}
def detect_intel_arc() -> dict | None:
if platform.system() != "Linux":
return None
if not shutil.which("clinfo"):
return None
out = _run(["clinfo", "--list"])
if "Intel" in out and ("Arc" in out or "Xe" in out):
return {"vendor": "intel", "name": "Intel Arc/Xe", "vram_gb": 0.0}
return None
def total_system_ram_gb() -> float:
sysname = platform.system()
if sysname == "Darwin":
try:
return round(int(_run(["sysctl", "-n", "hw.memsize"]).strip() or 0) / (1024**3), 1)
except ValueError:
return 0.0
if sysname == "Linux":
try:
with open("/proc/meminfo", "r") as fh:
for line in fh:
if line.startswith("MemTotal:"):
kb = int(line.split()[1])
return round(kb / (1024**2), 1)
except OSError:
return 0.0
if sysname == "Windows":
out = _run(["wmic", "ComputerSystem", "get", "TotalPhysicalMemory"])
m = re.search(r"(\d{6,})", out)
if m:
return round(int(m.group(1)) / (1024**3), 1)
return 0.0
# Map recommended_install_path → flag the agent can pass to `comfy install`
# Set to None when no local install is advised (verdict=cloud).
_COMFY_CLI_FLAG = {
"nvidia": "--nvidia",
"amd": "--amd",
"apple-silicon": "--m-series",
"intel": None, # comfy-cli has no Intel Arc flag — manual install
"comfy-cloud": None,
}
def classify(gpu: dict | None, ram_gb: float) -> tuple[str, str, list[str]]:
"""Return (verdict, recommended_install_path, notes)."""
notes: list[str] = []
if gpu is None:
notes.append(
"No supported accelerator found (NVIDIA CUDA / AMD ROCm / Apple Silicon / Intel Arc)."
)
notes.append(
"CPU-only ComfyUI works but is unusably slow for modern models — use Comfy Cloud."
)
return "cloud", "comfy-cloud", notes
if gpu["vendor"] == "apple":
gen = gpu.get("generation", 1)
mem = gpu.get("unified_memory_gb", 0.0)
if mem < MIN_MAC_RAM_GB:
notes.append(
f"Apple Silicon with {mem} GB unified memory — below the {MIN_MAC_RAM_GB} GB practical minimum."
)
notes.append("SD1.5 may work; SDXL/Flux will swap or OOM. Recommend Comfy Cloud.")
return "cloud", "comfy-cloud", notes
if mem < OK_MAC_RAM_GB:
notes.append(
f"Apple Silicon M{gen} with {mem} GB — SDXL works but slow. Flux/video likely too tight."
)
return "marginal", "apple-silicon", notes
notes.append(f"Apple Silicon M{gen} with {mem} GB unified memory — good for SDXL/Flux.")
return "ok", "apple-silicon", notes
# Discrete GPU path (nvidia/amd/intel)
vram = gpu.get("vram_gb", 0.0)
if gpu["vendor"] == "intel":
notes.append("Intel Arc detected — ComfyUI IPEX support is experimental; Comfy Cloud is more reliable.")
return "marginal", "intel", notes
if vram < MIN_VRAM_GB_USABLE:
notes.append(
f"{gpu['name']} has only {vram} GB VRAM — below the {MIN_VRAM_GB_USABLE} GB practical minimum."
)
notes.append("Most modern models won't load. Recommend Comfy Cloud.")
return "cloud", "comfy-cloud", notes
if vram < OK_VRAM_GB:
notes.append(
f"{gpu['name']} ({vram} GB VRAM) — SD1.5 works, SDXL tight, Flux/video unlikely."
)
return "marginal", gpu["vendor"], notes
if vram < GREAT_VRAM_GB:
notes.append(f"{gpu['name']} ({vram} GB VRAM) — SDXL comfortable, Flux possible with optimizations.")
return "ok", gpu["vendor"], notes
notes.append(f"{gpu['name']} ({vram} GB VRAM) — can run everything including Flux/video.")
return "ok", gpu["vendor"], notes
def build_report() -> dict:
sysname = platform.system()
arch = platform.machine()
ram_gb = total_system_ram_gb()
gpu = (
detect_nvidia()
or detect_rocm()
or detect_apple_silicon()
or detect_intel_arc()
)
# Intel Mac special case — fall out of apple-silicon detection with no GPU
if gpu is None and sysname == "Darwin" and platform.machine() != "arm64":
notes = [
"Intel Mac detected — no MPS backend available.",
"ComfyUI will fall back to CPU which is unusably slow. Use Comfy Cloud.",
]
return {
"os": sysname,
"arch": arch,
"system_ram_gb": ram_gb,
"gpu": None,
"verdict": "cloud",
"recommended_install_path": "comfy-cloud",
"comfy_cli_flag": None,
"notes": notes,
"install_urls": _install_urls(),
}
verdict, install_path, notes = classify(gpu, ram_gb)
return {
"os": sysname,
"arch": arch,
"system_ram_gb": ram_gb,
"gpu": gpu,
"verdict": verdict,
"recommended_install_path": install_path,
"comfy_cli_flag": _COMFY_CLI_FLAG.get(install_path),
"notes": notes,
"install_urls": _install_urls(),
}
def _install_urls() -> dict:
return {
"desktop": "https://docs.comfy.org/installation/desktop",
"manual": "https://docs.comfy.org/installation/manual_install",
"comfy_cli": "https://docs.comfy.org/comfy-cli/getting-started",
"cloud": "https://platform.comfy.org",
}
def main() -> int:
report = build_report()
json_mode = "--json" in sys.argv
if json_mode:
print(json.dumps(report, indent=2))
else:
print(f"OS: {report['os']} ({report['arch']})")
print(f"RAM: {report['system_ram_gb']} GB")
if report["gpu"]:
g = report["gpu"]
if g["vendor"] == "apple":
print(f"GPU: {g['name']}{g.get('unified_memory_gb', 0)} GB unified memory")
else:
print(f"GPU: {g['name']}{g.get('vram_gb', 0)} GB VRAM")
else:
print("GPU: (none detected)")
print(f"Verdict: {report['verdict']}{report['recommended_install_path']}")
if report["comfy_cli_flag"]:
print(f" → run: comfy --skip-prompt install {report['comfy_cli_flag']}")
for n in report["notes"]:
print(f"{n}")
if report["verdict"] == "ok":
return 0
if report["verdict"] == "marginal":
return 1
return 2
if __name__ == "__main__":
sys.exit(main())

View file

@ -0,0 +1,353 @@
#!/usr/bin/env python3
"""
run_workflow.py Inject parameters into a ComfyUI workflow, submit it, monitor execution,
and download outputs.
Usage:
# Local server
python3 run_workflow.py --workflow workflow_api.json \
--args '{"prompt": "a cat", "seed": 42}' \
--output-dir ./outputs
# Cloud server
python3 run_workflow.py --workflow workflow_api.json \
--args '{"prompt": "a cat"}' \
--host https://cloud.comfy.org \
--api-key comfyui-xxxxxxx \
--output-dir ./outputs
# With schema file (pre-extracted)
python3 run_workflow.py --workflow workflow_api.json \
--schema schema.json \
--args '{"prompt": "a cat"}' \
--output-dir ./outputs
Requires: Python 3.10+, requests (or urllib as fallback)
"""
import json
import sys
import time
import uuid
import copy
import argparse
from pathlib import Path
from urllib.parse import urljoin, urlencode, urlparse
try:
import requests
HAS_REQUESTS = True
except ImportError:
HAS_REQUESTS = False
import urllib.request
import urllib.error
def http_get(url: str, headers: dict = None, follow_redirects: bool = True) -> tuple:
"""GET request, returns (status_code, body_bytes, response_headers)."""
if HAS_REQUESTS:
r = requests.get(url, headers=headers or {}, allow_redirects=follow_redirects, timeout=30)
return r.status_code, r.content, dict(r.headers)
else:
req = urllib.request.Request(url, headers=headers or {})
try:
resp = urllib.request.urlopen(req, timeout=30)
return resp.status, resp.read(), dict(resp.headers)
except urllib.error.HTTPError as e:
return e.code, e.read(), dict(e.headers)
def http_post(url: str, data: dict, headers: dict = None) -> tuple:
"""POST JSON request, returns (status_code, response_dict)."""
payload = json.dumps(data).encode()
hdrs = {"Content-Type": "application/json"}
if headers:
hdrs.update(headers)
if HAS_REQUESTS:
r = requests.post(url, json=data, headers=hdrs, timeout=30)
try:
return r.status_code, r.json()
except Exception:
return r.status_code, {"raw": r.text}
else:
req = urllib.request.Request(url, data=payload, headers=hdrs, method="POST")
try:
resp = urllib.request.urlopen(req, timeout=30)
return resp.status, json.loads(resp.read())
except urllib.error.HTTPError as e:
return e.code, json.loads(e.read())
class ComfyRunner:
def __init__(self, host: str = "http://127.0.0.1:8188", api_key: str = None):
self.host = host.rstrip("/")
self.api_key = api_key
parsed_host = urlparse(self.host).hostname or ""
self.is_cloud = parsed_host.lower() == "cloud.comfy.org" or api_key is not None
self.client_id = str(uuid.uuid4())
@property
def headers(self) -> dict:
h = {}
if self.api_key:
h["X-API-Key"] = self.api_key
return h
def api_url(self, path: str) -> str:
"""Build URL. Cloud uses /api prefix for some endpoints."""
if self.is_cloud and not path.startswith("/api"):
# Cloud endpoints: /api/prompt, /api/view, /api/job, /api/queue
return f"{self.host}/api{path}"
return f"{self.host}{path}"
def check_server(self) -> bool:
"""Check if server is reachable."""
try:
url = self.api_url("/system_stats") if not self.is_cloud else f"{self.host}/api/system_stats"
status, _, _ = http_get(url, self.headers)
return status == 200
except Exception:
return False
def submit(self, workflow: dict) -> dict:
"""Submit workflow for execution. Returns {prompt_id, node_errors}."""
payload = {"prompt": workflow, "client_id": self.client_id}
if self.api_key and self.is_cloud:
payload.setdefault("extra_data", {})["api_key_comfy_org"] = self.api_key
url = self.api_url("/prompt")
status, resp = http_post(url, payload, self.headers)
if status != 200:
return {"error": f"HTTP {status}", "details": resp}
return resp
def poll_status(self, prompt_id: str, timeout: int = 120) -> dict:
"""Poll until job completes. Returns final status dict."""
start = time.time()
poll_interval = 2.0
while time.time() - start < timeout:
if self.is_cloud:
# Cloud has a dedicated status endpoint
url = f"{self.host}/api/job/{prompt_id}/status"
status, body, _ = http_get(url, self.headers)
if status == 200:
data = json.loads(body) if isinstance(body, bytes) else body
job_status = data.get("status", "unknown")
if job_status == "completed":
return {"status": "success", "data": data}
elif job_status == "failed":
return {"status": "error", "data": data}
elif job_status == "cancelled":
return {"status": "cancelled", "data": data}
# still running, continue polling
else:
# Local: check /history/{prompt_id}
url = f"{self.host}/history/{prompt_id}"
status, body, _ = http_get(url, self.headers)
if status == 200:
data = json.loads(body) if isinstance(body, bytes) else body
if prompt_id in data:
entry = data[prompt_id]
if entry.get("status", {}).get("completed", False):
return {"status": "success", "outputs": entry.get("outputs", {})}
if entry.get("status", {}).get("status_str") == "error":
return {"status": "error", "data": entry}
time.sleep(poll_interval)
poll_interval = min(poll_interval * 1.2, 10.0)
return {"status": "timeout", "elapsed": time.time() - start}
def get_outputs(self, prompt_id: str) -> dict:
"""Get output file info from history."""
if self.is_cloud:
url = f"{self.host}/api/job/{prompt_id}/status"
else:
url = f"{self.host}/history/{prompt_id}"
status, body, _ = http_get(url, self.headers)
if status != 200:
return {}
data = json.loads(body) if isinstance(body, bytes) else body
if self.is_cloud:
return data.get("outputs", {})
if prompt_id in data:
return data[prompt_id].get("outputs", {})
return {}
def download_output(self, filename: str, subfolder: str, file_type: str, output_dir: Path) -> Path:
"""Download a single output file."""
params = urlencode({"filename": filename, "subfolder": subfolder, "type": file_type})
url = self.api_url(f"/view?{params}")
status, body, _ = http_get(url, self.headers, follow_redirects=True)
if status != 200:
raise RuntimeError(f"Failed to download {filename}: HTTP {status}")
out_path = output_dir / filename
out_path.write_bytes(body)
return out_path
def load_schema(schema_path: str = None, workflow: dict = None) -> dict:
"""Load or generate parameter schema."""
if schema_path:
with open(schema_path) as f:
return json.load(f)
# Inline extraction (same logic as extract_schema.py but simplified)
if workflow is None:
return {"parameters": {}}
# Import from sibling script
script_dir = Path(__file__).parent
sys.path.insert(0, str(script_dir))
from extract_schema import extract_schema
return extract_schema(workflow)
def inject_params(workflow: dict, schema: dict, args: dict) -> dict:
"""Inject user parameters into workflow based on schema mapping."""
wf = copy.deepcopy(workflow)
params = schema.get("parameters", {})
for param_name, value in args.items():
if param_name not in params:
print(f"Warning: unknown parameter '{param_name}', skipping", file=sys.stderr)
continue
mapping = params[param_name]
node_id = mapping["node_id"]
field = mapping["field"]
if node_id in wf and "inputs" in wf[node_id]:
wf[node_id]["inputs"][field] = value
else:
print(f"Warning: node {node_id} not found in workflow", file=sys.stderr)
return wf
def main():
parser = argparse.ArgumentParser(description="Run a ComfyUI workflow with parameter injection")
parser.add_argument("--workflow", required=True, help="Path to workflow API JSON file")
parser.add_argument("--args", default="{}", help="JSON parameters to inject")
parser.add_argument("--schema", help="Path to schema JSON (from extract_schema.py). Auto-generated if omitted.")
parser.add_argument("--host", default="http://127.0.0.1:8188", help="ComfyUI server URL")
parser.add_argument("--api-key", help="API key for cloud (X-API-Key)")
parser.add_argument("--output-dir", default="./outputs", help="Directory to save outputs")
parser.add_argument("--timeout", type=int, default=120, help="Max seconds to wait for completion")
parser.add_argument("--no-download", action="store_true", help="Skip downloading outputs")
parser.add_argument("--submit-only", action="store_true", help="Submit and return prompt_id without waiting")
args = parser.parse_args()
# Load workflow
workflow_path = Path(args.workflow)
if not workflow_path.exists():
print(json.dumps({"error": f"Workflow file not found: {args.workflow}"}))
sys.exit(1)
with open(workflow_path) as f:
workflow = json.load(f)
# Validate format
if "nodes" in workflow and "links" in workflow:
print(json.dumps({"error": "Workflow is in editor format, not API format. Re-export with 'Save (API Format)'."}))
sys.exit(1)
# Parse user args
try:
user_args = json.loads(args.args)
except json.JSONDecodeError as e:
print(json.dumps({"error": f"Invalid --args JSON: {e}"}))
sys.exit(1)
# Load/generate schema and inject params
schema = load_schema(args.schema, workflow)
if user_args:
workflow = inject_params(workflow, schema, user_args)
# Connect to server
runner = ComfyRunner(host=args.host, api_key=args.api_key)
# Check server
if not runner.check_server():
print(json.dumps({"error": f"Cannot reach server at {args.host}. Is ComfyUI running?"}))
sys.exit(1)
# Submit
result = runner.submit(workflow)
if "error" in result:
print(json.dumps({"error": "Submission failed", "details": result}))
sys.exit(1)
prompt_id = result.get("prompt_id")
if not prompt_id:
print(json.dumps({"error": "No prompt_id in response", "response": result}))
sys.exit(1)
# Check for node errors
node_errors = result.get("node_errors", {})
if node_errors:
print(json.dumps({"error": "Workflow validation failed", "node_errors": node_errors}))
sys.exit(1)
if args.submit_only:
print(json.dumps({"status": "submitted", "prompt_id": prompt_id}))
sys.exit(0)
# Poll for completion
print(f"Submitted: {prompt_id}. Waiting...", file=sys.stderr)
poll_result = runner.poll_status(prompt_id, timeout=args.timeout)
if poll_result["status"] == "timeout":
print(json.dumps({"status": "timeout", "prompt_id": prompt_id, "elapsed": poll_result["elapsed"]}))
sys.exit(1)
elif poll_result["status"] == "error":
print(json.dumps({"status": "error", "prompt_id": prompt_id, "details": poll_result.get("data")}))
sys.exit(1)
elif poll_result["status"] == "cancelled":
print(json.dumps({"status": "cancelled", "prompt_id": prompt_id}))
sys.exit(1)
# Download outputs
outputs = poll_result.get("outputs") or runner.get_outputs(prompt_id)
if args.no_download:
print(json.dumps({"status": "success", "prompt_id": prompt_id, "outputs": outputs}))
sys.exit(0)
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
downloaded = []
for node_id, node_output in outputs.items():
# ComfyUI puts images/videos under "images" key (even for video)
for key in ("images", "gifs", "videos", "audio"):
if key not in node_output:
continue
for file_info in node_output[key]:
filename = file_info.get("filename", "")
subfolder = file_info.get("subfolder", "")
file_type = file_info.get("type", "output")
if not filename:
continue
try:
out_path = runner.download_output(filename, subfolder, file_type, output_dir)
# Detect media type from extension
ext = Path(filename).suffix.lower()
if ext in (".mp4", ".webm", ".avi", ".mov", ".gif"):
media_type = "video"
elif ext in (".wav", ".mp3", ".flac", ".ogg"):
media_type = "audio"
else:
media_type = "image"
downloaded.append({
"file": str(out_path),
"node_id": node_id,
"type": media_type,
"filename": filename,
})
except Exception as e:
print(f"Warning: failed to download {filename}: {e}", file=sys.stderr)
print(json.dumps({
"status": "success",
"prompt_id": prompt_id,
"outputs": downloaded,
}, indent=2))
if __name__ == "__main__":
main()