#!/usr/bin/env python3 """Collect CI job/step timings from the GitHub API and generate an HTML diff report. In CI, the script reads GITHUB_TOKEN, GITHUB_REPOSITORY, GITHUB_RUN_ID, and GITHUB_SHA from the environment to collect timings via the REST API. If a baseline JSON file (ci-timings-baseline.json by default) exists, the report includes a diff with per-job and per-step deltas, plus a gantt chart overlaying current vs baseline bars. Usage: # Collect from API (CI mode): python scripts/ci/timings_report.py # Regenerate HTML from saved JSON (testing): python scripts/ci/timings_report.py --from-json ci-timings.json """ from __future__ import annotations import argparse import json import os import sys import urllib.error import urllib.parse import urllib.request from datetime import datetime from html import escape API_BASE = "https://api.github.com" # --------------------------------------------------------------------------- # GitHub API helpers # --------------------------------------------------------------------------- def api_get(path: str, token: str, params: dict | None = None, list_key: str | None = None) -> list | dict: """Authenticated GitHub API GET with automatic pagination. For list endpoints, pass list_key to extract items from the paginated wrapper response (e.g. list_key='jobs' for {'total_count': N, 'jobs': [...]}). When list_key is omitted, a non-list response is returned as-is (single object). """ url = f"{API_BASE}{path}" if params: url += "?" + urllib.parse.urlencode(params) results: list = [] while url: req = urllib.request.Request(url, headers={ "Authorization": f"Bearer {token}", "Accept": "application/vnd.github+json", "X-GitHub-Api-Version": "2022-11-28", "User-Agent": "ci-timings-report", }) with urllib.request.urlopen(req) as resp: data = json.loads(resp.read()) link_header = resp.headers.get("Link", "") if list_key: results.extend(data.get(list_key, [])) elif isinstance(data, list): results.extend(data) else: return data next_url = None for part in link_header.split(","): part = part.strip() if 'rel="next"' in part: next_url = part[part.find("<") + 1:part.find(">")] break url = next_url return results def parse_ts(ts: str | None) -> datetime | None: if not ts: return None return datetime.fromisoformat(ts.replace("Z", "+00:00")) def dur_s(started: str | None, completed: str | None) -> float | None: s = parse_ts(started) e = parse_ts(completed) if not s or not e: return None return (e - s).total_seconds() # --------------------------------------------------------------------------- # Timings collection # --------------------------------------------------------------------------- def _normalize_job(raw: dict) -> dict: steps = [] for step in (raw.get("steps") or []): steps.append({ "name": step.get("name", ""), "number": step.get("number", 0), "status": step.get("status", ""), "conclusion": step.get("conclusion", ""), "started_at": step.get("started_at"), "completed_at": step.get("completed_at"), "duration_s": dur_s(step.get("started_at"), step.get("completed_at")), }) return { "name": raw.get("name", "unknown"), "workflow_name": raw.get("_workflow_name", ""), "job_id": raw.get("id"), "status": raw.get("status", ""), "conclusion": raw.get("conclusion", ""), "started_at": raw.get("started_at"), "completed_at": raw.get("completed_at"), "duration_s": dur_s(raw.get("started_at"), raw.get("completed_at")), "html_url": raw.get("html_url", ""), "steps": steps, } def collect_timings(token: str, repo: str, run_id: str, head_sha: str) -> dict: """Collect job/step timings from the GitHub API. 1. Get orchestrator run's direct jobs (detect, all-checks-pass, etc.). Skip workflow-call placeholder jobs (step name starts with "Run ./.github/"). 2. Find sub-workflow runs via head_sha + event=workflow_call. 3. Get each sub-workflow run's jobs with full step timing. """ owner, repo_name = repo.split("/") # Orchestrator run info run_info = api_get(f"/repos/{owner}/{repo_name}/actions/runs/{run_id}", token) created_at = run_info.get("created_at", "") # Orchestrator direct jobs orch_jobs = api_get(f"/repos/{owner}/{repo_name}/actions/runs/{run_id}/jobs", token, list_key="jobs") direct = [] for job in orch_jobs: steps = job.get("steps") or [] if any(s.get("name", "").startswith("Run ./.github/") for s in steps): continue # workflow-call placeholder if job.get("status") in ("in_progress", "queued"): continue # skip self / unfinished direct.append(job) # Sub-workflow runs sub_runs = api_get(f"/repos/{owner}/{repo_name}/actions/runs", token, params={ "head_sha": head_sha, "event": "workflow_call", "per_page": 100, }, list_key="workflow_runs") sub_runs = [r for r in sub_runs if r.get("created_at", "") >= created_at] sub_jobs_raw = [] for sr in sub_runs: sr_id = sr["id"] sr_name = sr.get("name", "") sr_jobs = api_get(f"/repos/{owner}/{repo_name}/actions/runs/{sr_id}/jobs", token, list_key="jobs") for j in sr_jobs: j["_workflow_name"] = sr_name j["_workflow_run_id"] = sr_id sub_jobs_raw.append(j) # Normalize + sort all_jobs = [_normalize_job(j) for j in direct + sub_jobs_raw] all_jobs = [j for j in all_jobs if j["status"] not in ("in_progress", "queued")] all_jobs.sort(key=lambda j: j.get("started_at") or "") return { "run_id": run_id, "head_sha": head_sha, "created_at": created_at, "jobs": all_jobs, } # --------------------------------------------------------------------------- # Formatting helpers # --------------------------------------------------------------------------- def fmt_dur(seconds: float | None) -> str: if seconds is None: return "—" if seconds < 60: return f"{seconds:.1f}s" m = int(seconds // 60) s = seconds % 60 if s == 0: return f"{m}m" return f"{m}m{s:.0f}s" def fmt_delta(current: float | None, baseline: float | None) -> tuple[str, str]: """Return (text, css_class) for a delta.""" if current is None or baseline is None: return ("—", "neutral") delta = current - baseline if baseline == 0: pct_str = "new" if delta > 0 else "0%" else: pct = (delta / baseline) * 100 pct_str = f"{pct:+.1f}%" if abs(delta) < 1.0: cls = "neutral" elif delta > 0: cls = "slower" else: cls = "faster" sign = "+" if delta >= 0 else "" return (f"{sign}{delta:.1f}s ({pct_str})", cls) def nice_ticks(max_seconds: float, num_ticks: int = 8) -> list[int]: if max_seconds <= 0: return [0] raw = max_seconds / num_ticks for nice in [5, 10, 15, 30, 60, 120, 180, 300, 600, 900, 1800, 3600, 7200]: if nice >= raw: step = nice break else: step = max(int(raw), 3600) return list(range(0, int(max_seconds) + step + 1, step)) def fmt_tick(seconds: int) -> str: if seconds < 60: return f"{seconds}s" m, s = divmod(seconds, 60) if s == 0: return f"{m}m" return f"{m}m{s}s" # --------------------------------------------------------------------------- # Stats computation # --------------------------------------------------------------------------- def compute_stats(timings: dict, baseline: dict | None = None) -> dict: jobs = timings.get("jobs", []) bl_jobs = {j["name"]: j for j in (baseline or {}).get("jobs", [])} # Wall time starts = [s for s in (parse_ts(j.get("started_at")) for j in jobs) if s is not None] ends = [e for e in (parse_ts(j.get("completed_at")) for j in jobs) if e is not None] wall = (max(ends) - min(starts)).total_seconds() if starts and ends else 0 compute = sum(j.get("duration_s") or 0 for j in jobs) # Baseline wall/compute bl_wall = None bl_compute = None if baseline: bl_starts = [s for s in (parse_ts(j.get("started_at")) for j in baseline.get("jobs", [])) if s is not None] bl_ends = [e for e in (parse_ts(j.get("completed_at")) for j in baseline.get("jobs", [])) if e is not None] if bl_starts and bl_ends: bl_wall = (max(bl_ends) - min(bl_starts)).total_seconds() bl_compute = sum(j.get("duration_s") or 0 for j in baseline.get("jobs", [])) # Per-job deltas faster = 0 slower = 0 unchanged = 0 no_baseline = 0 for j in jobs: bl = bl_jobs.get(j["name"]) if not bl: no_baseline += 1 continue cur_d = j.get("duration_s") or 0 bl_d = bl.get("duration_s") or 0 if abs(cur_d - bl_d) < 1.0: unchanged += 1 elif cur_d > bl_d: slower += 1 else: faster += 1 return { "wall": wall, "compute": compute, "bl_wall": bl_wall, "bl_compute": bl_compute, "faster": faster, "slower": slower, "unchanged": unchanged, "no_baseline": no_baseline, "total_jobs": len(jobs), } # --------------------------------------------------------------------------- # HTML generation # --------------------------------------------------------------------------- CSS = """ * { box-sizing: border-box; margin: 0; padding: 0; } body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Helvetica, Arial, sans-serif; background: #0d1117; color: #e6edf3; line-height: 1.5; padding: 24px; } h1 { font-size: 24px; border-bottom: 1px solid #30363d; padding-bottom: 12px; margin-bottom: 8px; } .meta { color: #8b949e; font-size: 13px; margin-bottom: 24px; } h2 { font-size: 18px; margin: 32px 0 12px; } /* Stats cards */ .stats { display: flex; gap: 12px; flex-wrap: wrap; margin-bottom: 24px; } .stat-card { background: #161b22; border: 1px solid #30363d; border-radius: 8px; padding: 14px 18px; min-width: 140px; } .stat-label { font-size: 12px; color: #8b949e; text-transform: uppercase; letter-spacing: 0.5px; } .stat-value { font-size: 22px; font-weight: 600; margin: 4px 0; } .stat-delta { font-size: 13px; } .faster { color: #3fb950; } .slower { color: #f85149; } .neutral { color: #8b949e; } /* Gantt */ .gantt-wrap { overflow-x: auto; } .gantt { min-width: 700px; } .gantt-row { display: flex; align-items: center; height: 28px; } .gantt-label { width: 220px; padding-right: 12px; text-align: right; font-size: 12px; white-space: nowrap; overflow: hidden; text-overflow: ellipsis; } .gantt-track { flex: 1; position: relative; height: 100%; border-left: 1px solid #21262d; } .gantt-bar { position: absolute; height: 18px; border-radius: 3px; display: flex; align-items: center; justify-content: center; font-size: 10px; color: transparent; overflow: hidden; transition: color 0.15s; } .gantt-bar:hover { color: #fff; z-index: 10; } .gantt-bar.current { background: #1f6feb; top: 5px; z-index: 2; } .gantt-bar.baseline { background: transparent; border: 1px dashed #8b949e; top: 2px; height: 24px; z-index: 1; } .gantt-axis { display: flex; height: 20px; position: relative; border-top: 1px solid #30363d; margin-top: 4px; } .gantt-tick { position: absolute; font-size: 10px; color: #8b949e; transform: translateX(-50%); top: 4px; } .gantt-tick::before { content: ''; position: absolute; top: -4px; left: 50%; width: 1px; height: 4px; background: #30363d; } .legend { display: flex; gap: 16px; margin-top: 8px; font-size: 12px; color: #8b949e; } .legend-swatch { display: inline-block; width: 16px; height: 10px; border-radius: 2px; margin-right: 4px; vertical-align: middle; } /* Tables */ table { border-collapse: collapse; width: 100%; font-size: 13px; margin-bottom: 16px; } th, td { border: 1px solid #30363d; padding: 6px 10px; text-align: left; } th { background: #161b22; font-weight: 600; position: sticky; top: 0; } tr:hover td { background: #161b22; } .num { text-align: right; font-variant-numeric: tabular-nums; } .job-name { font-weight: 500; } /* Step details */ details { margin-bottom: 8px; background: #161b22; border: 1px solid #30363d; border-radius: 6px; } summary { padding: 8px 12px; cursor: pointer; font-weight: 500; font-size: 14px; user-select: none; } summary:hover { background: #21262d; } details[open] summary { border-bottom: 1px solid #30363d; } details table { border: none; margin: 0; } details td, details th { font-size: 12px; } /* Worst regressions */ .regressions { margin-bottom: 24px; } .regressions table { font-size: 13px; } .tag { display: inline-block; padding: 1px 6px; border-radius: 3px; font-size: 11px; font-weight: 500; } .tag.slow { background: rgba(248,81,73,0.15); color: #f85149; } .tag.fast { background: rgba(63,185,80,0.15); color: #3fb950; } """ def _gantt_bars(timings: dict, baseline: dict | None) -> str: """Render the gantt chart HTML. Both current and baseline timelines are normalized to start at t=0 (relative to each run's earliest job start). The axis scale spans 0..max_end across both runs so bars are directly comparable. """ jobs = [j for j in timings.get("jobs", []) if j.get("started_at") and j.get("completed_at")] bl_map = {j["name"]: j for j in (baseline or {}).get("jobs", [])} # Current run: relative offsets from earliest start cur_starts = [s for s in (parse_ts(j.get("started_at")) for j in jobs) if s is not None] cur_ends = [e for e in (parse_ts(j.get("completed_at")) for j in jobs) if e is not None] if not cur_starts or not cur_ends: return '

No timing data available.

' cur_t0 = min(cur_starts) cur_max = (max(cur_ends) - cur_t0).total_seconds() # Baseline run: relative offsets from its earliest start bl_t0 = None bl_max = 0.0 bl_jobs_timed = [] for bl_j in bl_map.values(): s = parse_ts(bl_j.get("started_at")) e = parse_ts(bl_j.get("completed_at")) if s is not None and e is not None: bl_jobs_timed.append((bl_j, s, e)) if bl_t0 is None or s < bl_t0: bl_t0 = s rel_end = (e - s).total_seconds() + (s - (bl_t0 or s)).total_seconds() if bl_t0 is not None: bl_max = max((e - bl_t0).total_seconds() for _, _, e in bl_jobs_timed) if bl_jobs_timed else 0 total_s = max(cur_max, bl_max) if total_s <= 0: total_s = 1 rows = [] for j in jobs: s = parse_ts(j.get("started_at")) e = parse_ts(j.get("completed_at")) if s is None or e is None: continue left = (s - cur_t0).total_seconds() / total_s * 100 width = max((e - s).total_seconds() / total_s * 100, 0.5) # min 0.5% for visibility dur = j.get("duration_s") or 0 bl = bl_map.get(j["name"]) bl_bar = "" if bl and bl_t0 is not None: bl_s = parse_ts(bl.get("started_at")) bl_e = parse_ts(bl.get("completed_at")) if bl_s is not None and bl_e is not None: bl_left = (bl_s - bl_t0).total_seconds() / total_s * 100 bl_width = max((bl_e - bl_s).total_seconds() / total_s * 100, 0.5) bl_dur = bl.get("duration_s") or 0 bl_bar = ( f'
' ) name_display = escape(j["name"]) if j.get("workflow_name"): name_display = f'{escape(j["workflow_name"])} / {escape(j["name"])}' delta_info = "" if bl and bl.get("duration_s") is not None: d_text, d_cls = fmt_delta(dur, bl.get("duration_s")) delta_info = f' — {d_text}' rows.append( f'
' f'
{name_display}
' f'
' f'{bl_bar}' f'
' f'
' ) # Axis ticks = nice_ticks(total_s) tick_html = "".join( f'{fmt_tick(t)}' for t in ticks ) axis = f'
{tick_html}
' legend = ( '
' 'Current' ) if baseline: legend += 'Baseline (main)' legend += '
' return f'
{"".join(rows)}{axis}
{legend}' def _stats_cards(stats: dict) -> str: wall_text = fmt_dur(stats["wall"]) wall_delta = "" if stats["bl_wall"] is not None: d, cls = fmt_delta(stats["wall"], stats["bl_wall"]) wall_delta = f'{d}' compute_text = fmt_dur(stats["compute"]) compute_delta = "" if stats["bl_compute"] is not None: d, cls = fmt_delta(stats["compute"], stats["bl_compute"]) compute_delta = f'{d}' cards = [ f'
Wall Time' f'
{wall_text}
{wall_delta}
', f'
Total Compute' f'
{compute_text}
{compute_delta}
', f'
Jobs Faster' f'
{stats["faster"]}
', f'
Jobs Slower' f'
{stats["slower"]}
', f'
Unchanged' f'
{stats["unchanged"]}
', f'
No Baseline' f'
{stats["no_baseline"]}
', ] return f'
{"".join(cards)}
' def _job_table(timings: dict, baseline: dict | None) -> str: bl_map = {j["name"]: j for j in (baseline or {}).get("jobs", [])} rows = [] for j in timings.get("jobs", []): dur = j.get("duration_s") bl = bl_map.get(j["name"]) bl_dur = bl.get("duration_s") if bl else None delta_text, delta_cls = fmt_delta(dur, bl_dur) name = escape(j["name"]) if j.get("workflow_name"): name = f'{escape(j["workflow_name"])} / {escape(j["name"])}' concl = j.get("conclusion", "") concl_icon = {"success": "✓", "failure": "✗", "skipped": "⊘"}.get(concl, "?") concl_cls = {"success": "faster", "failure": "slower", "skipped": "neutral"}.get(concl, "neutral") rows.append( f'' f'{name}' f'{fmt_dur(dur)}' f'{fmt_dur(bl_dur)}' f'{delta_text}' f'{concl_icon}' f'' ) return ( '' '' '' '' + "".join(rows) + '
JobCurrentBaselineDeltaStatus
' ) def _step_details(timings: dict, baseline: dict | None) -> str: bl_map = {j["name"]: j for j in (baseline or {}).get("jobs", [])} blocks = [] for j in timings.get("jobs", []): if not j.get("steps"): continue bl = bl_map.get(j["name"], {}) bl_steps = {s["name"]: s for s in bl.get("steps", [])} dur = j.get("duration_s") or 0 bl_dur = bl.get("duration_s") if bl else None delta_text, delta_cls = fmt_delta(dur, bl_dur) summary_text = f'{escape(j["name"])} — {fmt_dur(dur)}' if bl_dur is not None: summary_text += f' ({delta_text})' step_rows = [] for s in j["steps"]: s_dur = s.get("duration_s") bl_s = bl_steps.get(s["name"]) bl_s_dur = bl_s.get("duration_s") if bl_s else None s_delta, s_cls = fmt_delta(s_dur, bl_s_dur) step_rows.append( f'' f'{escape(s["name"])}' f'{fmt_dur(s_dur)}' f'{fmt_dur(bl_s_dur)}' f'{s_delta}' f'' ) blocks.append( f'
{summary_text}' f'' '' '' f'{"".join(step_rows)}
StepCurrentBaselineDelta
' f'
' ) return "".join(blocks) if blocks else '

No step data available.

' def _regressions(timings: dict, baseline: dict | None) -> str: """Show top 10 biggest absolute regressions/improvements across all steps.""" if not baseline: return "" bl_map = {j["name"]: j for j in baseline.get("jobs", [])} deltas = [] # (abs_delta, job_name, step_name, current, baseline, is_slower) for j in timings.get("jobs", []): bl = bl_map.get(j["name"]) if not bl: continue bl_steps = {s["name"]: s for s in bl.get("steps", [])} for s in j.get("steps", []): bl_s = bl_steps.get(s["name"]) if not bl_s: continue cur = s.get("duration_s") or 0 bl_d = bl_s.get("duration_s") or 0 diff = cur - bl_d if abs(diff) < 1.0: continue deltas.append((abs(diff), diff, j["name"], s["name"], cur, bl_d)) deltas.sort(key=lambda x: x[0], reverse=True) top = deltas[:10] if not top: return "" rows = [] for _, diff, job, step, cur, bl_d in top: cls = "slower" if diff > 0 else "faster" tag = f' 0 else "fast"}">{"+" if diff > 0 else ""}{diff:.1f}s' rows.append( f'' f'{escape(job)}' f'{escape(step)}' f'{fmt_dur(cur)}' f'{fmt_dur(bl_d)}' f'{tag}' f'' ) return ( '
' '' '' '' '' + "".join(rows) + '
JobStepCurrentBaselineDelta
' '
' ) def generate_html(timings: dict, baseline: dict | None = None) -> str: stats = compute_stats(timings, baseline) sha_short = (timings.get("head_sha") or "")[:7] run_id = timings.get("run_id", "?") created = timings.get("created_at", "") bl_info = "" if baseline: bl_sha = (baseline.get("head_sha") or "")[:7] bl_info = f' | Baseline: {bl_sha} (main)' html = ( f'\n\n\n' f'\n' f'\n' f'CI Timing Report — {sha_short}\n' f'\n' f'\n\n' f'

CI Timing Report

\n' f'
Run {escape(run_id)} | SHA {sha_short}' f' | Generated {escape(created)}{bl_info}
\n' ) html += '

Global Stats

\n' html += _stats_cards(stats) if baseline: html += '

Top Regressions & Improvements

\n' html += _regressions(timings, baseline) html += '

Gantt Chart

\n' html += _gantt_bars(timings, baseline) html += '

Per-Job Comparison

\n' html += _job_table(timings, baseline) html += '

Step Details

\n' html += _step_details(timings, baseline) html += '\n\n' return html # --------------------------------------------------------------------------- # Markdown summary for $GITHUB_STEP_SUMMARY # --------------------------------------------------------------------------- def generate_summary(timings: dict, baseline: dict | None = None) -> str: stats = compute_stats(timings, baseline) bl_map = {j["name"]: j for j in (baseline or {}).get("jobs", [])} lines = ["## CI Timing Summary\n"] # Global stats table lines.append("| Metric | Current | Baseline | Delta |") lines.append("|--------|---------|----------|-------|") wall_d = "" if stats["bl_wall"] is not None: d, _ = fmt_delta(stats["wall"], stats["bl_wall"]) wall_d = d lines.append(f"| Wall time | {fmt_dur(stats['wall'])} | {fmt_dur(stats['bl_wall'])} | {wall_d} |") compute_d = "" if stats["bl_compute"] is not None: d, _ = fmt_delta(stats["compute"], stats["bl_compute"]) compute_d = d lines.append(f"| Total compute | {fmt_dur(stats['compute'])} | {fmt_dur(stats['bl_compute'])} | {compute_d} |") lines.append(f"| Jobs faster | {stats['faster']} | — | — |") lines.append(f"| Jobs slower | {stats['slower']} | — | — |") lines.append(f"| Jobs unchanged | {stats['unchanged']} | — | — |") lines.append(f"| Jobs without baseline | {stats['no_baseline']} | — | — |") lines.append("") return "\n".join(lines) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def expect_env(var: str) -> str: val = os.environ.get(var) if not val: raise ValueError(f"missing environment variable {var}") return val def main(): parser = argparse.ArgumentParser(description="Collect CI timings and generate HTML report") parser.add_argument("--from-json", help="Read timings from JSON instead of API") parser.add_argument("--baseline", default="ci-timings-baseline.json", help="Baseline JSON path (default: ci-timings-baseline.json)") parser.add_argument("--output", default="ci-timings-report.html", help="HTML output path (default: ci-timings-report.html)") parser.add_argument("--json-out", default="ci-timings.json", help="JSON output path (default: ci-timings.json)") parser.add_argument("--summary-out", default="ci-timings-summary.md", help="Markdown summary output path (default: ci-timings-summary.md)") args = parser.parse_args() # Collect or load timings if args.from_json: with open(args.from_json, encoding="utf-8") as f: timings = json.load(f) else: token = expect_env("GITHUB_TOKEN") repo = expect_env("GITHUB_REPOSITORY") run_id = expect_env("GITHUB_RUN_ID") head_sha = expect_env("GITHUB_SHA") timings = collect_timings(token, repo, run_id, head_sha) # Save JSON with open(args.json_out, "w", encoding="utf-8") as f: json.dump(timings, f, indent=2) print(f"Saved timings to {args.json_out} ({len(timings.get('jobs', []))} jobs)") # Load baseline baseline = None if os.path.exists(args.baseline): with open(args.baseline, encoding="utf-8") as f: baseline = json.load(f) print(f"Loaded baseline from {args.baseline}") else: print(f"No baseline file at {args.baseline} — generating current-only report") # Generate HTML html = generate_html(timings, baseline) with open(args.output, "w", encoding="utf-8") as f: f.write(html) print(f"Generated HTML report: {args.output}") # Write summary summary = generate_summary(timings, baseline) with open(args.summary_out, "a", encoding="utf-8") as f: f.write(summary) print(f"Wrote summary to {args.summary_out}") if __name__ == "__main__": main()