From b689624aeeef190c93a6c87f6b28ef07a44e8fc6 Mon Sep 17 00:00:00 2001 From: ethernet Date: Thu, 21 May 2026 11:01:15 -0400 Subject: [PATCH] feat(ci): 4-way matrix slicing with LPT duration-balanced distribution MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit run_tests_parallel.py: - --slice I/N flag (also HERMES_TEST_SLICE env var) runs only the I-th slice of N, distributing files across slices by cached duration using LPT (Longest Processing Time first) greedy algorithm so each slice gets roughly equal wall time - Duration cache (test_durations.json): maps relative file paths to last-observed subprocess wall time. _save_durations merges with existing cache so entries from other slices are preserved. - Per-file subprocess timing in progress output + end-of-run distribution summary (percentiles, top-10 slowest, <1s/<2s counts) - Unknown files default to 2.0s estimate (~P50), spread evenly by LPT .github/workflows/tests.yml: - Matrix strategy: slice [1, 2, 3, 4] with fail-fast: false - Each slice restores duration cache from main (stable key, no SHA), runs its portion, uploads per-slice durations as artifacts - save-durations job (main only, if: always()) downloads all 4 artifacts, merges into single cache entry for future PRs - Timeout reduced from 60min to 30min per slice (~1/4 the work) Cache design: - Stable key (test-durations) not keyed by commit SHA — durations are about files, not commits, and SHA-keyed caches miss on every new commit and on PR merge commits - actions/cache scoping: main's cache is visible to all PRs targeting main; feature branches without a cache still work (default 2.0s) - No dotfile prefix (upload-artifact v7 skips hidden files) --- .github/workflows/tests.yml | 63 ++++++++++- .gitignore | 1 + scripts/run_tests_parallel.py | 203 +++++++++++++++++++++++++++++++++- 3 files changed, 258 insertions(+), 9 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 3ffaa10d009..ca3f0f3433d 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -23,11 +23,22 @@ concurrency: jobs: test: runs-on: ubuntu-latest - timeout-minutes: 60 + timeout-minutes: 30 + strategy: + fail-fast: false + matrix: + slice: [1, 2, 3, 4] steps: - name: Checkout code uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + - name: Restore duration cache + uses: actions/cache/restore@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5 + with: + path: test_durations.json + # Single stable key. main always overwrites, PRs always find it. + key: test-durations + - name: Install ripgrep (prebuilt binary) run: | set -euo pipefail @@ -54,7 +65,7 @@ jobs: source .venv/bin/activate uv pip install -e ".[all,dev]" - - name: Run tests + - name: Run tests (slice ${{ matrix.slice }}/4) # Per-file isolation via scripts/run_tests_parallel.py: discovers # every test_*.py file under tests/ (excluding integration/ + e2e/), # then runs `python -m pytest ` in a freshly-spawned subprocess @@ -72,15 +83,61 @@ jobs: # state across files, which is exactly the leakage we wanted to # fix. ThreadPoolExecutor + subprocess.run is ~60 lines and does # the job with cleaner semantics. + # + # Matrix slicing (--slice I/N): files are distributed across 4 + # jobs by cached duration (LPT algorithm) so each job gets + # roughly equal wall time. Without a cache, files default to 2s + # estimate and get split roughly evenly by count — still correct, + # just not perfectly balanced. run: | source .venv/bin/activate - python scripts/run_tests_parallel.py + python scripts/run_tests_parallel.py --slice ${{ matrix.slice }}/4 env: # Ensure tests don't accidentally call real APIs OPENROUTER_API_KEY: "" OPENAI_API_KEY: "" NOUS_API_KEY: "" + - name: Upload per-slice durations + uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1 + with: + name: test-durations-slice-${{ matrix.slice }} + path: test_durations.json + retention-days: 1 + + # Merge per-slice duration data into a single cache, so future runs + # (including PRs) get balanced slicing. + save-durations: + needs: test + if: always() && github.ref == 'refs/heads/main' + runs-on: ubuntu-latest + steps: + - name: Download all slice durations + uses: actions/download-artifact@3e5f45b2cfb9172054b4087a40e8e0b5a5461e7c # v8.0.1 + with: + pattern: test-durations-slice-* + path: durations + merge-multiple: true + + - name: Merge into single durations file + run: | + python3 -c " + import json, glob, os + merged = {} + for f in glob.glob('durations/*test_durations.json'): + with open(f) as fh: + merged.update(json.load(fh)) + with open('test_durations.json', 'w') as fh: + json.dump(merged, fh, indent=2, sort_keys=True) + print(f'Merged {len(merged)} file durations') + " + + - name: Save merged duration cache + uses: actions/cache/save@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5 + with: + path: test_durations.json + key: test-durations + e2e: runs-on: ubuntu-latest timeout-minutes: 15 diff --git a/.gitignore b/.gitignore index 2dbd15c6c7d..8bbe7235ee9 100644 --- a/.gitignore +++ b/.gitignore @@ -18,6 +18,7 @@ __pycache__/web_tools.cpython-310.pyc logs/ data/ .pytest_cache/ +test_durations.json .pytest-cache/ tmp/ temp_vision_images/ diff --git a/scripts/run_tests_parallel.py b/scripts/run_tests_parallel.py index 7daaa6cbb1e..57178899012 100755 --- a/scripts/run_tests_parallel.py +++ b/scripts/run_tests_parallel.py @@ -38,6 +38,7 @@ Exit code: 0 if every file's pytest exited 0; 1 otherwise. from __future__ import annotations import argparse +import json import os import subprocess import sys @@ -62,6 +63,11 @@ _SKIP_PARTS = {"integration", "e2e"} # via --file-timeout or HERMES_TEST_FILE_TIMEOUT. _DEFAULT_FILE_TIMEOUT_SECONDS = 600.0 # 10 minutes +# Duration cache: maps relative file paths to last-observed subprocess +# wall-clock seconds. Used by ``--slice`` to distribute files across +# CI jobs by estimated total time, so no one job gets all the slow files. +_DURATIONS_FILE = "test_durations.json" + def _count_tests( files: List[Path], repo_root: Path, pytest_passthrough: List[str] @@ -219,10 +225,10 @@ def _run_one_file( pytest_args: List[str], repo_root: Path, file_timeout: float, -) -> Tuple[Path, int, str, dict[str, int]]: +) -> Tuple[Path, int, str, dict[str, int], float]: """Run ``python -m pytest `` in a fresh subprocess. - Returns (file, returncode, captured_combined_output, summary_counts). + Returns (file, returncode, captured_combined_output, summary_counts, subprocess_wall_seconds). ``summary_counts`` is the result of ``_parse_pytest_summary(output)`` — @@ -247,6 +253,7 @@ def _run_one_file( bound a pathologically slow or hung file as a whole. """ cmd = [sys.executable, "-m", "pytest", str(file), *pytest_args] + subproc_start = time.monotonic() proc = subprocess.Popen( cmd, cwd=repo_root, @@ -308,7 +315,8 @@ def _run_one_file( # so the operator can spot it. rc = 0 summary = _parse_pytest_summary(output) - return file, rc, output, summary + subproc_wall = time.monotonic() - subproc_start + return file, rc, output, summary, subproc_wall def _parse_pytest_summary(output: str) -> dict[str, int]: @@ -370,12 +378,17 @@ def _print_progress( tests_failed: int, test_counts: dict[Path, int], file_summary: dict[str, int] | None = None, + subproc_wall: float | None = None, ) -> None: """Single-line live progress. When ``file_summary`` is provided (parsed from pytest output), the per-file parenthetical shows individual test pass/fail counts instead of just the total test count. + + ``subproc_wall`` is the actual subprocess wall-clock time (excluding + queue-wait). When available, the display shows both the subprocess + time and the queue-inclusive elapsed time. """ status = "✓" if rc == 0 else "✗" pct = (tests_done / total_tests * 100) if total_tests else 0 @@ -407,10 +420,15 @@ def _print_progress( else: n_tests = test_counts.get(file, 0) test_str = f"{n_tests} tests, " if n_tests else "" + # Show subprocess time when available; fall back to queue-inclusive dur. + if subproc_wall is not None: + time_str = f"{subproc_wall:.1f}s" + else: + time_str = f"{dur:.1f}s" msg = ( f"[{pct:5.1f}% | {tests_done:>5}/{total_tests}" f" | ✓{tests_passed:>{fw}} | ✗{tests_failed:>{fw}}] " - f"{status} {_format_file(file, repo_root)} ({test_str}{dur:.1f}s)" + f"{status} {_format_file(file, repo_root)} ({test_str}{time_str})" ) # Truncate to terminal width if available (no clobbering ANSI lines). try: @@ -453,6 +471,107 @@ def _print_inline_failure( print(flush=True) +def _load_durations(repo_root: Path) -> dict[str, float]: + """Read the duration cache from the repo root. + + Returns a dict mapping relative file paths (e.g. + ``tests/tools/test_code_execution.py``) to wall-clock seconds from + the last run. Missing or corrupt file → empty dict (safe fallback). + """ + path = repo_root / _DURATIONS_FILE + if not path.is_file(): + return {} + try: + return json.loads(path.read_text()) + except (json.JSONDecodeError, OSError): + return {} + + +def _save_durations( + file_times: List[Tuple[Path, float]], + repo_root: Path, +) -> None: + """Write the duration cache so future ``--slice`` runs can use it. + + Merges with any existing cache so entries from files not in the + current run (e.g. from a different slice) are preserved. Keys are + repo-relative paths so the cache is portable across checkouts + and CI runners. + """ + data: dict[str, float] = _load_durations(repo_root) + for f, t in file_times: + key = _format_file(f, repo_root) + data[key] = round(t, 3) + path = repo_root / _DURATIONS_FILE + path.write_text(json.dumps(data, indent=2, sort_keys=True) + "\n") + + +def _slice_files( + files: List[Path], + slice_index: int, + slice_count: int, + durations: dict[str, float], + repo_root: Path, +) -> List[Path]: + """Return the subset of *files* belonging to slice *slice_index*. + + Uses **Longest Processing Time first** (LPT) distribution: sort files + by estimated duration descending, then greedily assign each file to + the slice with the smallest accumulated time so far. This minimizes + the makespan (max slice duration) and keeps CI jobs balanced. + + Files with no cached duration get a default estimate of 2.0s (roughly + the P50 from profiling). This means first-time ``--slice`` runs + (no cache) still get reasonable distribution, and new files don't + all land in one slice. + + ``slice_index`` is 1-indexed (1..slice_count) for ergonomics — + ``--slice 1/4`` reads more naturally than ``--slice 0/4``. + """ + if slice_count < 2: + return files + if not (1 <= slice_index <= slice_count): + print( + f"error: --slice index must be 1..{slice_count}, got {slice_index}", + file=sys.stderr, + ) + sys.exit(2) + + # Build (file, estimated_duration) pairs. + default_dur = 2.0 + file_durs: List[Tuple[Path, float]] = [] + for f in files: + rel = _format_file(f, repo_root) + dur = durations.get(rel, default_dur) + file_durs.append((f, dur)) + + # Sort longest first (LPT). + file_durs.sort(key=lambda x: x[1], reverse=True) + + # Greedy assignment: for each file, add it to the slice with the + # smallest current total. + bucket_files: List[List[Path]] = [[] for _ in range(slice_count)] + bucket_totals: List[float] = [0.0] * slice_count + + for f, dur in file_durs: + # Find the least-loaded bucket. + min_idx = min(range(slice_count), key=lambda i: bucket_totals[i]) + bucket_files[min_idx].append(f) + bucket_totals[min_idx] += dur + + # Print slice summary for visibility. + target = bucket_files[slice_index - 1] + target_dur = bucket_totals[slice_index - 1] + total_dur = sum(bucket_totals) + print( + f"Slice {slice_index}/{slice_count}: {len(target)} files " + f"(~{target_dur:.0f}s estimated of {total_dur:.0f}s total)", + flush=True, + ) + + return target + + def main() -> int: parser = argparse.ArgumentParser( description=__doc__, @@ -487,6 +606,17 @@ def main() -> int: "Default: 600 (10 min), env: HERMES_TEST_FILE_TIMEOUT." ), ) + parser.add_argument( + "--slice", + metavar="I/N", + help=( + "Run only slice I of N (e.g. --slice 1/4). " + "Files are distributed across slices using cached durations " + "so each slice takes roughly equal wall time. " + "Without a duration cache, files are distributed by count. " + "Env: HERMES_TEST_SLICE (format: I/N)." + ), + ) parser.add_argument( "paths_positional", nargs="*", @@ -509,6 +639,20 @@ def main() -> int: our_args, pytest_passthrough = argv, [] args = parser.parse_args(our_args) + # Parse --slice (or HERMES_TEST_SLICE) early so we can exit on bad input + # before doing any expensive discovery. + slice_raw = args.slice or os.environ.get("HERMES_TEST_SLICE") + slice_index: int | None = None + slice_count: int = 1 + if slice_raw: + try: + idx_s, count_s = slice_raw.split("/", 1) + slice_index = int(idx_s) + slice_count = int(count_s) + except (ValueError, AttributeError): + print(f"error: --slice must be I/N (e.g. 1/4), got: {slice_raw!r}", file=sys.stderr) + sys.exit(2) + repo_root = Path(__file__).resolve().parent.parent # Resolve discovery roots: positional path args override --paths if any @@ -535,6 +679,15 @@ def main() -> int: test_counts = _count_tests(files, repo_root, pytest_passthrough) total_tests = sum(test_counts.values()) + # Apply slicing if requested — distribute files across CI jobs by + # estimated duration so no one job gets all the slow files. + if slice_index is not None: + durations = _load_durations(repo_root) + files = _slice_files(files, slice_index, slice_count, durations, repo_root) + # Recount after slicing. + test_counts = {f: test_counts[f] for f in files if f in test_counts} + total_tests = sum(test_counts.values()) + print( f"Discovered {len(files)} test files ({total_tests} tests) under " f"{[str(r.relative_to(repo_root)) if r.is_relative_to(repo_root) else str(r) for r in roots]}; " @@ -545,6 +698,7 @@ def main() -> int: # Capture and print on completion (out-of-order is fine — keeps the # terminal clean rather than interleaving N parallel pytest outputs). failures: List[Tuple[Path, str, Dict[str, int]]] = [] + file_times: List[Tuple[Path, float]] = [] # (file, subprocess_wall) for distribution started = time.monotonic() files_done = 0 tests_done = 0 @@ -554,11 +708,11 @@ def main() -> int: tests_failed = 0 lock = threading.Lock() - def _on_done(file: Path, started_at: float, fut: "Future[Tuple[Path, int, str, dict[str, int]]]") -> None: + def _on_done(file: Path, started_at: float, fut: "Future[Tuple[Path, int, str, dict[str, int], float]]") -> None: nonlocal files_done, tests_done, pass_count, fail_count, tests_passed, tests_failed n_tests = test_counts.get(file, 0) try: - fpath, rc, output, summary = fut.result() + fpath, rc, output, summary, subproc_wall = fut.result() except Exception as exc: # noqa: BLE001 — must always advance counter with lock: files_done += 1 @@ -570,6 +724,7 @@ def main() -> int: time.monotonic() - started_at, repo_root, tests_passed, tests_failed, test_counts, + subproc_wall=0.0, ) return with lock: @@ -578,6 +733,7 @@ def main() -> int: # Accumulate test-level counts from parsed summary. tests_passed += summary.get("passed", 0) tests_failed += summary.get("failed", 0) + file_times.append((fpath, subproc_wall)) if rc == 0: pass_count += 1 else: @@ -589,6 +745,7 @@ def main() -> int: repo_root, tests_passed, tests_failed, test_counts, file_summary=summary, + subproc_wall=subproc_wall, ) if rc != 0: _print_inline_failure(fpath, output, repo_root, pytest_passthrough) @@ -613,6 +770,40 @@ def main() -> int: pct = (tests_done / total_tests * 100) if total_tests else 0 print(f"=== Summary: {len(files)} files, {tests_passed} tests passed, {tests_failed} failed ({pct:.0f}% complete) in {elapsed:.1f}s ({args.jobs} workers) ===") + # Save durations for future --slice runs. Each slice writes its own + # partial test_durations.json; a CI merge step joins them later. + # Locally, _save_durations merges with any existing cache so entries + # from previous runs aren't lost. + if file_times: + _save_durations(file_times, repo_root) + print(f" Durations cached to {_DURATIONS_FILE} ({len(file_times)} files)") + + # Per-file time distribution (throwaway diagnostic — shows how + # subprocess time is distributed so we can see if startup dominates). + if file_times: + times = sorted([t for _, t in file_times]) + total_subproc = sum(times) + median_t = times[len(times) // 2] + p50 = median_t + p90 = times[int(len(times) * 0.90)] + p95 = times[int(len(times) * 0.95)] + p99 = times[min(int(len(times) * 0.99), len(times) - 1)] + max_t = times[-1] + # How many files finish in <1s? That's roughly "just startup". + fast = sum(1 for t in times if t < 1.0) + fast_2s = sum(1 for t in times if t < 2.0) + print() + print(f"=== Per-file subprocess time distribution ===") + print(f" Files: {len(times)}") + print(f" Total subprocess CPU-wall: {total_subproc:.1f}s (runner wall: {elapsed:.1f}s, parallelism: {args.jobs}x)") + print(f" P50: {p50:.2f}s P90: {p90:.2f}s P95: {p95:.2f}s P99: {p99:.2f}s Max: {max_t:.2f}s") + print(f" <1s: {fast} files ({fast/len(times)*100:.0f}%) <2s: {fast_2s} files ({fast_2s/len(times)*100:.0f}%)") + # Top 10 slowest files — likely the ones dragging the run. + slowest = sorted(file_times, key=lambda x: x[1], reverse=True)[:10] + print(f" Top 10 slowest:") + for f, t in slowest: + print(f" {t:>6.2f}s {_format_file(f, repo_root)}") + if failures: print() print("=== Failure output ===")