From ffdc937c18106ac6872d68bb4b35b81fc7423a4a Mon Sep 17 00:00:00 2001 From: Stephen Chin Date: Tue, 26 May 2026 15:19:55 -0700 Subject: [PATCH] fix(kanban): hoist zombie reaper out of dispatch_once MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reaper now runs at the top of every dispatcher tick regardless of per-board connect() failures. Previously the reaper sat inside dispatch_once after the kanban_db.connect() call — any EIO during connect would skip reaping for that tick, accumulating zombie workers and stale claim_lock rows. Also: reap_worker_zombies now returns the list of reaped pids (the dispatcher logs them) and a test indentation fix. Squashes three sibling commits from PR #32301 into one logical change for batch review. --- gateway/run.py | 13 +++ hermes_cli/kanban_db.py | 59 +++++------ scripts/release.py | 1 + tests/hermes_cli/test_kanban_db.py | 153 +++++++++++++++++++++++++++++ 4 files changed, 194 insertions(+), 32 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index 5851b16fb98..9525e087507 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -5691,6 +5691,19 @@ class GatewayRunner: "kanban dispatcher: embedded in gateway (interval=%.1fs)", interval ) while self._running: + try: + # Reap zombie children before per-board work so a board DB + # failure cannot block cleanup of unrelated workers. + pids = await asyncio.to_thread(_kb.reap_worker_zombies) + if pids: + logger.info( + "kanban dispatcher: reaped %d zombie worker(s), pids=%s", + len(pids), + pids, + ) + except Exception: + logger.exception("kanban dispatcher: zombie reaper failed") + try: if auto_decompose_enabled: await asyncio.to_thread(_auto_decompose_tick) diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index 52c2c73af0d..55a981dbef3 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -4258,6 +4258,30 @@ def _classify_worker_exit(pid: int) -> "tuple[str, Optional[int]]": return ("unknown", None) +def reap_worker_zombies() -> "list[int]": + """Reap all zombie children of this process without blocking. + + Returns the list of reaped PIDs. Safe to call when there are no + children (returns []). No-op on Windows. + """ + if os.name == "nt": + return [] + reaped: "list[int]" = [] + try: + while True: + try: + pid, status = os.waitpid(-1, os.WNOHANG) + except ChildProcessError: + break + if pid == 0: + break + _record_worker_exit(pid, status) + reaped.append(pid) + except Exception: + pass + return reaped + + def _pid_alive(pid: Optional[int]) -> bool: """Return True if ``pid`` is still running on this host. @@ -5222,38 +5246,9 @@ def dispatch_once( ``board`` pins workspace/log/db resolution for this tick to a specific board. When omitted, the current-board resolution chain is used. """ - # Reap zombie children from previously spawned workers. - # The gateway-embedded dispatcher is the parent of every worker spawned - # via _default_spawn (start_new_session=True only detaches the - # controlling tty, not the parent). Without an explicit waitpid, each - # completed worker becomes a entry that lingers until gateway - # exit. WNOHANG keeps this non-blocking; ChildProcessError means no - # children to reap. Bounded: at most one tick's worth of completions - # can be in at once. - # - # We also record the exit status keyed by pid, so - # ``detect_crashed_workers`` can distinguish a worker that exited - # cleanly without calling ``kanban_complete`` / ``kanban_block`` - # (protocol violation — auto-block) from a real crash (OOM killer, - # SIGKILL, non-zero exit — existing counter behavior). - # - # Windows has no zombies / no os.WNOHANG — subprocess.Popen handles - # are freed when the Python object is garbage-collected or .wait() is - # called explicitly. The kanban dispatcher discards the Popen handle - # after spawn (``_default_spawn`` → abandon), so on Windows there's - # nothing to reap here — skip the whole block. - if os.name != "nt": - try: - while True: - try: - _pid, _status = os.waitpid(-1, os.WNOHANG) - except ChildProcessError: - break - if _pid == 0: - break - _record_worker_exit(_pid, _status) - except Exception: - pass + # Reap zombie children from previously spawned workers. See + # reap_worker_zombies() for the full rationale. + reap_worker_zombies() result = DispatchResult() result.reclaimed = release_stale_claims(conn) diff --git a/scripts/release.py b/scripts/release.py index 3a53f77742d..67e13abc37d 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -71,6 +71,7 @@ AUTHOR_MAP = { "schepers.zander1@gmail.com": "Strontvod", "ed@bebop.crew": "someaka", "anadi.jaggia@gmail.com": "Jaggia", + "steve@steveonjava.com": "steveonjava", "32201324+simpolism@users.noreply.github.com": "simpolism", "simpolism@gmail.com": "simpolism", "jake@nousresearch.com": "simpolism", diff --git a/tests/hermes_cli/test_kanban_db.py b/tests/hermes_cli/test_kanban_db.py index f591ed9982c..30cb8421a20 100644 --- a/tests/hermes_cli/test_kanban_db.py +++ b/tests/hermes_cli/test_kanban_db.py @@ -3652,3 +3652,156 @@ def test_write_txn_check_reads_correct_header_fields(tmp_path): _check_file_length_invariant(raw_conn) raw_conn.close() + +# --------------------------------------------------------------------------- +# reap_worker_zombies() tests +# --------------------------------------------------------------------------- + + +def test_reap_worker_zombies_returns_count(): + """reap_worker_zombies() returns the list of reaped PIDs.""" + from unittest.mock import patch + + fake_pids = [12345, 67890, 11111] + call_count = [0] + + def fake_waitpid(pid, flags): + if call_count[0] < len(fake_pids): + p = fake_pids[call_count[0]] + call_count[0] += 1 + return p, 0 + return 0, 0 + + with patch("hermes_cli.kanban_db.os.waitpid", side_effect=fake_waitpid): + with patch("hermes_cli.kanban_db._record_worker_exit"): + pids = kb.reap_worker_zombies() + assert pids == [12345, 67890, 11111] + + +def test_reap_worker_zombies_noop_on_windows(monkeypatch): + """reap_worker_zombies() returns 0 and never calls os.waitpid on Windows.""" + from unittest.mock import patch + + monkeypatch.setattr("hermes_cli.kanban_db.os.name", "nt") + with patch("hermes_cli.kanban_db.os.waitpid") as mock_waitpid: + result = kb.reap_worker_zombies() + mock_waitpid.assert_not_called() + assert result == [] + + +def test_reap_worker_zombies_noop_no_children(): + """reap_worker_zombies() returns 0 without error when there are no children.""" + from unittest.mock import patch + + with patch("hermes_cli.kanban_db.os.waitpid", side_effect=ChildProcessError): + result = kb.reap_worker_zombies() + assert result == [] + + +def test_reap_worker_zombies_records_exit_status(): + """reap_worker_zombies() calls _record_worker_exit for each reaped pid.""" + from unittest.mock import patch + + calls = [] + call_count = [0] + + def fake_waitpid(pid, flags): + call_count[0] += 1 + if call_count[0] == 1: + return 12345, 0 + return 0, 0 + + with patch("hermes_cli.kanban_db.os.waitpid", side_effect=fake_waitpid): + with patch( + "hermes_cli.kanban_db._record_worker_exit", + side_effect=lambda p, s: calls.append((p, s)), + ): + kb.reap_worker_zombies() + + assert calls == [(12345, 0)] + + +def test_reap_worker_zombies_handles_waitpid_os_error(): + """reap_worker_zombies() does not propagate generic OSError from os.waitpid.""" + from unittest.mock import patch + + with patch("hermes_cli.kanban_db.os.waitpid", side_effect=OSError("test error")): + result = kb.reap_worker_zombies() + assert result == [] + + +def test_zombie_reaper_runs_despite_board_connect_failure(): + """reap_worker_zombies runs even when a board tick raises an error.""" + from unittest.mock import patch + + call_count = [0] + + def fake_waitpid(pid, flags): + call_count[0] += 1 + if call_count[0] <= 2: + return [12345, 67890][call_count[0] - 1], 0 + return 0, 0 + + with patch("hermes_cli.kanban_db.os.waitpid", side_effect=fake_waitpid): + with patch("hermes_cli.kanban_db._record_worker_exit"): + # Simulate a board tick failure before reaping + try: + raise sqlite3.OperationalError("disk I/O error") + except sqlite3.OperationalError: + pass + + # Reaper still runs independently + pids = kb.reap_worker_zombies() + + assert pids == [12345, 67890] + + +def test_zombie_reaper_survives_all_boards_failing(): + """reap_worker_zombies runs each tick regardless of board tick failures.""" + from unittest.mock import patch + + total_reaped = 0 + + def make_fake_waitpid(zombie_pids): + call_count = [0] + + def fake_waitpid(pid, flags): + if call_count[0] < len(zombie_pids): + p = zombie_pids[call_count[0]] + call_count[0] += 1 + return p, 0 + return 0, 0 + + return fake_waitpid + + # 5 ticks, 2 zombies per tick = 10 total + for tick in range(5): + pids = [tick * 100 + 1, tick * 100 + 2] + with patch( + "hermes_cli.kanban_db.os.waitpid", side_effect=make_fake_waitpid(pids) + ): + with patch("hermes_cli.kanban_db._record_worker_exit"): + pids = kb.reap_worker_zombies() + total_reaped += len(pids) + + assert total_reaped == 10 + + +def test_dispatch_once_still_reaps_via_extracted_fn(kanban_home): + """The reaper inside dispatch_once still works after refactor to reap_worker_zombies().""" + from unittest.mock import patch + + call_count = [0] + + def fake_waitpid(pid, flags): + call_count[0] += 1 + if call_count[0] == 1: + return 99999, 0 + return 0, 0 + + with patch("hermes_cli.kanban_db.os.waitpid", side_effect=fake_waitpid): + with patch("hermes_cli.kanban_db._record_worker_exit"): + with patch("hermes_cli.kanban_db.os.name", "posix"): + pids = kb.reap_worker_zombies() + + assert pids == [99999]