feat(ci): 4-way matrix slicing with LPT duration-balanced distribution

run_tests_parallel.py:
  - --slice I/N flag (also HERMES_TEST_SLICE env var) runs only the
    I-th slice of N, distributing files across slices by cached
    duration using LPT (Longest Processing Time first) greedy
    algorithm so each slice gets roughly equal wall time
  - Duration cache (test_durations.json): maps relative file paths to
    last-observed subprocess wall time. _save_durations merges with
    existing cache so entries from other slices are preserved.
  - Per-file subprocess timing in progress output + end-of-run
    distribution summary (percentiles, top-10 slowest, <1s/<2s counts)
  - Unknown files default to 2.0s estimate (~P50), spread evenly by LPT

.github/workflows/tests.yml:
  - Matrix strategy: slice [1, 2, 3, 4] with fail-fast: false
  - Each slice restores duration cache from main (stable key, no SHA),
    runs its portion, uploads per-slice durations as artifacts
  - save-durations job (main only, if: always()) downloads all 4
    artifacts, merges into single cache entry for future PRs
  - Timeout reduced from 60min to 30min per slice (~1/4 the work)

Cache design:
  - Stable key (test-durations) not keyed by commit SHA — durations
    are about files, not commits, and SHA-keyed caches miss on every
    new commit and on PR merge commits
  - actions/cache scoping: main's cache is visible to all PRs targeting
    main; feature branches without a cache still work (default 2.0s)
  - No dotfile prefix (upload-artifact v7 skips hidden files)
This commit is contained in:
ethernet 2026-05-21 11:01:15 -04:00 committed by Teknium
parent a84cec61ca
commit b689624aee
3 changed files with 258 additions and 9 deletions

View file

@ -23,11 +23,22 @@ concurrency:
jobs:
test:
runs-on: ubuntu-latest
timeout-minutes: 60
timeout-minutes: 30
strategy:
fail-fast: false
matrix:
slice: [1, 2, 3, 4]
steps:
- name: Checkout code
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Restore duration cache
uses: actions/cache/restore@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
with:
path: test_durations.json
# Single stable key. main always overwrites, PRs always find it.
key: test-durations
- name: Install ripgrep (prebuilt binary)
run: |
set -euo pipefail
@ -54,7 +65,7 @@ jobs:
source .venv/bin/activate
uv pip install -e ".[all,dev]"
- name: Run tests
- name: Run tests (slice ${{ matrix.slice }}/4)
# Per-file isolation via scripts/run_tests_parallel.py: discovers
# every test_*.py file under tests/ (excluding integration/ + e2e/),
# then runs `python -m pytest <file>` in a freshly-spawned subprocess
@ -72,15 +83,61 @@ jobs:
# state across files, which is exactly the leakage we wanted to
# fix. ThreadPoolExecutor + subprocess.run is ~60 lines and does
# the job with cleaner semantics.
#
# Matrix slicing (--slice I/N): files are distributed across 4
# jobs by cached duration (LPT algorithm) so each job gets
# roughly equal wall time. Without a cache, files default to 2s
# estimate and get split roughly evenly by count — still correct,
# just not perfectly balanced.
run: |
source .venv/bin/activate
python scripts/run_tests_parallel.py
python scripts/run_tests_parallel.py --slice ${{ matrix.slice }}/4
env:
# Ensure tests don't accidentally call real APIs
OPENROUTER_API_KEY: ""
OPENAI_API_KEY: ""
NOUS_API_KEY: ""
- name: Upload per-slice durations
uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1
with:
name: test-durations-slice-${{ matrix.slice }}
path: test_durations.json
retention-days: 1
# Merge per-slice duration data into a single cache, so future runs
# (including PRs) get balanced slicing.
save-durations:
needs: test
if: always() && github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- name: Download all slice durations
uses: actions/download-artifact@3e5f45b2cfb9172054b4087a40e8e0b5a5461e7c # v8.0.1
with:
pattern: test-durations-slice-*
path: durations
merge-multiple: true
- name: Merge into single durations file
run: |
python3 -c "
import json, glob, os
merged = {}
for f in glob.glob('durations/*test_durations.json'):
with open(f) as fh:
merged.update(json.load(fh))
with open('test_durations.json', 'w') as fh:
json.dump(merged, fh, indent=2, sort_keys=True)
print(f'Merged {len(merged)} file durations')
"
- name: Save merged duration cache
uses: actions/cache/save@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
with:
path: test_durations.json
key: test-durations
e2e:
runs-on: ubuntu-latest
timeout-minutes: 15

1
.gitignore vendored
View file

@ -18,6 +18,7 @@ __pycache__/web_tools.cpython-310.pyc
logs/
data/
.pytest_cache/
test_durations.json
.pytest-cache/
tmp/
temp_vision_images/

View file

@ -38,6 +38,7 @@ Exit code: 0 if every file's pytest exited 0; 1 otherwise.
from __future__ import annotations
import argparse
import json
import os
import subprocess
import sys
@ -62,6 +63,11 @@ _SKIP_PARTS = {"integration", "e2e"}
# via --file-timeout or HERMES_TEST_FILE_TIMEOUT.
_DEFAULT_FILE_TIMEOUT_SECONDS = 600.0 # 10 minutes
# Duration cache: maps relative file paths to last-observed subprocess
# wall-clock seconds. Used by ``--slice`` to distribute files across
# CI jobs by estimated total time, so no one job gets all the slow files.
_DURATIONS_FILE = "test_durations.json"
def _count_tests(
files: List[Path], repo_root: Path, pytest_passthrough: List[str]
@ -219,10 +225,10 @@ def _run_one_file(
pytest_args: List[str],
repo_root: Path,
file_timeout: float,
) -> Tuple[Path, int, str, dict[str, int]]:
) -> Tuple[Path, int, str, dict[str, int], float]:
"""Run ``python -m pytest <file> <pytest_args>`` in a fresh subprocess.
Returns (file, returncode, captured_combined_output, summary_counts).
Returns (file, returncode, captured_combined_output, summary_counts, subprocess_wall_seconds).
``summary_counts`` is the result of ``_parse_pytest_summary(output)``
@ -247,6 +253,7 @@ def _run_one_file(
bound a pathologically slow or hung file as a whole.
"""
cmd = [sys.executable, "-m", "pytest", str(file), *pytest_args]
subproc_start = time.monotonic()
proc = subprocess.Popen(
cmd,
cwd=repo_root,
@ -308,7 +315,8 @@ def _run_one_file(
# so the operator can spot it.
rc = 0
summary = _parse_pytest_summary(output)
return file, rc, output, summary
subproc_wall = time.monotonic() - subproc_start
return file, rc, output, summary, subproc_wall
def _parse_pytest_summary(output: str) -> dict[str, int]:
@ -370,12 +378,17 @@ def _print_progress(
tests_failed: int,
test_counts: dict[Path, int],
file_summary: dict[str, int] | None = None,
subproc_wall: float | None = None,
) -> None:
"""Single-line live progress.
When ``file_summary`` is provided (parsed from pytest output), the
per-file parenthetical shows individual test pass/fail counts instead
of just the total test count.
``subproc_wall`` is the actual subprocess wall-clock time (excluding
queue-wait). When available, the display shows both the subprocess
time and the queue-inclusive elapsed time.
"""
status = "" if rc == 0 else ""
pct = (tests_done / total_tests * 100) if total_tests else 0
@ -407,10 +420,15 @@ def _print_progress(
else:
n_tests = test_counts.get(file, 0)
test_str = f"{n_tests} tests, " if n_tests else ""
# Show subprocess time when available; fall back to queue-inclusive dur.
if subproc_wall is not None:
time_str = f"{subproc_wall:.1f}s"
else:
time_str = f"{dur:.1f}s"
msg = (
f"[{pct:5.1f}% | {tests_done:>5}/{total_tests}"
f" | ✓{tests_passed:>{fw}} | ✗{tests_failed:>{fw}}] "
f"{status} {_format_file(file, repo_root)} ({test_str}{dur:.1f}s)"
f"{status} {_format_file(file, repo_root)} ({test_str}{time_str})"
)
# Truncate to terminal width if available (no clobbering ANSI lines).
try:
@ -453,6 +471,107 @@ def _print_inline_failure(
print(flush=True)
def _load_durations(repo_root: Path) -> dict[str, float]:
"""Read the duration cache from the repo root.
Returns a dict mapping relative file paths (e.g.
``tests/tools/test_code_execution.py``) to wall-clock seconds from
the last run. Missing or corrupt file empty dict (safe fallback).
"""
path = repo_root / _DURATIONS_FILE
if not path.is_file():
return {}
try:
return json.loads(path.read_text())
except (json.JSONDecodeError, OSError):
return {}
def _save_durations(
file_times: List[Tuple[Path, float]],
repo_root: Path,
) -> None:
"""Write the duration cache so future ``--slice`` runs can use it.
Merges with any existing cache so entries from files not in the
current run (e.g. from a different slice) are preserved. Keys are
repo-relative paths so the cache is portable across checkouts
and CI runners.
"""
data: dict[str, float] = _load_durations(repo_root)
for f, t in file_times:
key = _format_file(f, repo_root)
data[key] = round(t, 3)
path = repo_root / _DURATIONS_FILE
path.write_text(json.dumps(data, indent=2, sort_keys=True) + "\n")
def _slice_files(
files: List[Path],
slice_index: int,
slice_count: int,
durations: dict[str, float],
repo_root: Path,
) -> List[Path]:
"""Return the subset of *files* belonging to slice *slice_index*.
Uses **Longest Processing Time first** (LPT) distribution: sort files
by estimated duration descending, then greedily assign each file to
the slice with the smallest accumulated time so far. This minimizes
the makespan (max slice duration) and keeps CI jobs balanced.
Files with no cached duration get a default estimate of 2.0s (roughly
the P50 from profiling). This means first-time ``--slice`` runs
(no cache) still get reasonable distribution, and new files don't
all land in one slice.
``slice_index`` is 1-indexed (1..slice_count) for ergonomics
``--slice 1/4`` reads more naturally than ``--slice 0/4``.
"""
if slice_count < 2:
return files
if not (1 <= slice_index <= slice_count):
print(
f"error: --slice index must be 1..{slice_count}, got {slice_index}",
file=sys.stderr,
)
sys.exit(2)
# Build (file, estimated_duration) pairs.
default_dur = 2.0
file_durs: List[Tuple[Path, float]] = []
for f in files:
rel = _format_file(f, repo_root)
dur = durations.get(rel, default_dur)
file_durs.append((f, dur))
# Sort longest first (LPT).
file_durs.sort(key=lambda x: x[1], reverse=True)
# Greedy assignment: for each file, add it to the slice with the
# smallest current total.
bucket_files: List[List[Path]] = [[] for _ in range(slice_count)]
bucket_totals: List[float] = [0.0] * slice_count
for f, dur in file_durs:
# Find the least-loaded bucket.
min_idx = min(range(slice_count), key=lambda i: bucket_totals[i])
bucket_files[min_idx].append(f)
bucket_totals[min_idx] += dur
# Print slice summary for visibility.
target = bucket_files[slice_index - 1]
target_dur = bucket_totals[slice_index - 1]
total_dur = sum(bucket_totals)
print(
f"Slice {slice_index}/{slice_count}: {len(target)} files "
f"(~{target_dur:.0f}s estimated of {total_dur:.0f}s total)",
flush=True,
)
return target
def main() -> int:
parser = argparse.ArgumentParser(
description=__doc__,
@ -487,6 +606,17 @@ def main() -> int:
"Default: 600 (10 min), env: HERMES_TEST_FILE_TIMEOUT."
),
)
parser.add_argument(
"--slice",
metavar="I/N",
help=(
"Run only slice I of N (e.g. --slice 1/4). "
"Files are distributed across slices using cached durations "
"so each slice takes roughly equal wall time. "
"Without a duration cache, files are distributed by count. "
"Env: HERMES_TEST_SLICE (format: I/N)."
),
)
parser.add_argument(
"paths_positional",
nargs="*",
@ -509,6 +639,20 @@ def main() -> int:
our_args, pytest_passthrough = argv, []
args = parser.parse_args(our_args)
# Parse --slice (or HERMES_TEST_SLICE) early so we can exit on bad input
# before doing any expensive discovery.
slice_raw = args.slice or os.environ.get("HERMES_TEST_SLICE")
slice_index: int | None = None
slice_count: int = 1
if slice_raw:
try:
idx_s, count_s = slice_raw.split("/", 1)
slice_index = int(idx_s)
slice_count = int(count_s)
except (ValueError, AttributeError):
print(f"error: --slice must be I/N (e.g. 1/4), got: {slice_raw!r}", file=sys.stderr)
sys.exit(2)
repo_root = Path(__file__).resolve().parent.parent
# Resolve discovery roots: positional path args override --paths if any
@ -535,6 +679,15 @@ def main() -> int:
test_counts = _count_tests(files, repo_root, pytest_passthrough)
total_tests = sum(test_counts.values())
# Apply slicing if requested — distribute files across CI jobs by
# estimated duration so no one job gets all the slow files.
if slice_index is not None:
durations = _load_durations(repo_root)
files = _slice_files(files, slice_index, slice_count, durations, repo_root)
# Recount after slicing.
test_counts = {f: test_counts[f] for f in files if f in test_counts}
total_tests = sum(test_counts.values())
print(
f"Discovered {len(files)} test files ({total_tests} tests) under "
f"{[str(r.relative_to(repo_root)) if r.is_relative_to(repo_root) else str(r) for r in roots]}; "
@ -545,6 +698,7 @@ def main() -> int:
# Capture and print on completion (out-of-order is fine — keeps the
# terminal clean rather than interleaving N parallel pytest outputs).
failures: List[Tuple[Path, str, Dict[str, int]]] = []
file_times: List[Tuple[Path, float]] = [] # (file, subprocess_wall) for distribution
started = time.monotonic()
files_done = 0
tests_done = 0
@ -554,11 +708,11 @@ def main() -> int:
tests_failed = 0
lock = threading.Lock()
def _on_done(file: Path, started_at: float, fut: "Future[Tuple[Path, int, str, dict[str, int]]]") -> None:
def _on_done(file: Path, started_at: float, fut: "Future[Tuple[Path, int, str, dict[str, int], float]]") -> None:
nonlocal files_done, tests_done, pass_count, fail_count, tests_passed, tests_failed
n_tests = test_counts.get(file, 0)
try:
fpath, rc, output, summary = fut.result()
fpath, rc, output, summary, subproc_wall = fut.result()
except Exception as exc: # noqa: BLE001 — must always advance counter
with lock:
files_done += 1
@ -570,6 +724,7 @@ def main() -> int:
time.monotonic() - started_at,
repo_root, tests_passed, tests_failed,
test_counts,
subproc_wall=0.0,
)
return
with lock:
@ -578,6 +733,7 @@ def main() -> int:
# Accumulate test-level counts from parsed summary.
tests_passed += summary.get("passed", 0)
tests_failed += summary.get("failed", 0)
file_times.append((fpath, subproc_wall))
if rc == 0:
pass_count += 1
else:
@ -589,6 +745,7 @@ def main() -> int:
repo_root, tests_passed, tests_failed,
test_counts,
file_summary=summary,
subproc_wall=subproc_wall,
)
if rc != 0:
_print_inline_failure(fpath, output, repo_root, pytest_passthrough)
@ -613,6 +770,40 @@ def main() -> int:
pct = (tests_done / total_tests * 100) if total_tests else 0
print(f"=== Summary: {len(files)} files, {tests_passed} tests passed, {tests_failed} failed ({pct:.0f}% complete) in {elapsed:.1f}s ({args.jobs} workers) ===")
# Save durations for future --slice runs. Each slice writes its own
# partial test_durations.json; a CI merge step joins them later.
# Locally, _save_durations merges with any existing cache so entries
# from previous runs aren't lost.
if file_times:
_save_durations(file_times, repo_root)
print(f" Durations cached to {_DURATIONS_FILE} ({len(file_times)} files)")
# Per-file time distribution (throwaway diagnostic — shows how
# subprocess time is distributed so we can see if startup dominates).
if file_times:
times = sorted([t for _, t in file_times])
total_subproc = sum(times)
median_t = times[len(times) // 2]
p50 = median_t
p90 = times[int(len(times) * 0.90)]
p95 = times[int(len(times) * 0.95)]
p99 = times[min(int(len(times) * 0.99), len(times) - 1)]
max_t = times[-1]
# How many files finish in <1s? That's roughly "just startup".
fast = sum(1 for t in times if t < 1.0)
fast_2s = sum(1 for t in times if t < 2.0)
print()
print(f"=== Per-file subprocess time distribution ===")
print(f" Files: {len(times)}")
print(f" Total subprocess CPU-wall: {total_subproc:.1f}s (runner wall: {elapsed:.1f}s, parallelism: {args.jobs}x)")
print(f" P50: {p50:.2f}s P90: {p90:.2f}s P95: {p95:.2f}s P99: {p99:.2f}s Max: {max_t:.2f}s")
print(f" <1s: {fast} files ({fast/len(times)*100:.0f}%) <2s: {fast_2s} files ({fast_2s/len(times)*100:.0f}%)")
# Top 10 slowest files — likely the ones dragging the run.
slowest = sorted(file_times, key=lambda x: x[1], reverse=True)[:10]
print(f" Top 10 slowest:")
for f, t in slowest:
print(f" {t:>6.2f}s {_format_file(f, repo_root)}")
if failures:
print()
print("=== Failure output ===")