mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-31 06:51:29 +00:00
fix(kanban): hoist zombie reaper out of dispatch_once
Reaper now runs at the top of every dispatcher tick regardless of per-board connect() failures. Previously the reaper sat inside dispatch_once after the kanban_db.connect() call — any EIO during connect would skip reaping for that tick, accumulating zombie workers and stale claim_lock rows. Also: reap_worker_zombies now returns the list of reaped pids (the dispatcher logs them) and a test indentation fix. Squashes three sibling commits from PR #32301 into one logical change for batch review.
This commit is contained in:
parent
99c19eb2fe
commit
ffdc937c18
4 changed files with 194 additions and 32 deletions
|
|
@ -5691,6 +5691,19 @@ class GatewayRunner:
|
||||||
"kanban dispatcher: embedded in gateway (interval=%.1fs)", interval
|
"kanban dispatcher: embedded in gateway (interval=%.1fs)", interval
|
||||||
)
|
)
|
||||||
while self._running:
|
while self._running:
|
||||||
|
try:
|
||||||
|
# Reap zombie children before per-board work so a board DB
|
||||||
|
# failure cannot block cleanup of unrelated workers.
|
||||||
|
pids = await asyncio.to_thread(_kb.reap_worker_zombies)
|
||||||
|
if pids:
|
||||||
|
logger.info(
|
||||||
|
"kanban dispatcher: reaped %d zombie worker(s), pids=%s",
|
||||||
|
len(pids),
|
||||||
|
pids,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("kanban dispatcher: zombie reaper failed")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if auto_decompose_enabled:
|
if auto_decompose_enabled:
|
||||||
await asyncio.to_thread(_auto_decompose_tick)
|
await asyncio.to_thread(_auto_decompose_tick)
|
||||||
|
|
|
||||||
|
|
@ -4258,6 +4258,30 @@ def _classify_worker_exit(pid: int) -> "tuple[str, Optional[int]]":
|
||||||
return ("unknown", None)
|
return ("unknown", None)
|
||||||
|
|
||||||
|
|
||||||
|
def reap_worker_zombies() -> "list[int]":
|
||||||
|
"""Reap all zombie children of this process without blocking.
|
||||||
|
|
||||||
|
Returns the list of reaped PIDs. Safe to call when there are no
|
||||||
|
children (returns []). No-op on Windows.
|
||||||
|
"""
|
||||||
|
if os.name == "nt":
|
||||||
|
return []
|
||||||
|
reaped: "list[int]" = []
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
pid, status = os.waitpid(-1, os.WNOHANG)
|
||||||
|
except ChildProcessError:
|
||||||
|
break
|
||||||
|
if pid == 0:
|
||||||
|
break
|
||||||
|
_record_worker_exit(pid, status)
|
||||||
|
reaped.append(pid)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return reaped
|
||||||
|
|
||||||
|
|
||||||
def _pid_alive(pid: Optional[int]) -> bool:
|
def _pid_alive(pid: Optional[int]) -> bool:
|
||||||
"""Return True if ``pid`` is still running on this host.
|
"""Return True if ``pid`` is still running on this host.
|
||||||
|
|
||||||
|
|
@ -5222,38 +5246,9 @@ def dispatch_once(
|
||||||
``board`` pins workspace/log/db resolution for this tick to a specific
|
``board`` pins workspace/log/db resolution for this tick to a specific
|
||||||
board. When omitted, the current-board resolution chain is used.
|
board. When omitted, the current-board resolution chain is used.
|
||||||
"""
|
"""
|
||||||
# Reap zombie children from previously spawned workers.
|
# Reap zombie children from previously spawned workers. See
|
||||||
# The gateway-embedded dispatcher is the parent of every worker spawned
|
# reap_worker_zombies() for the full rationale.
|
||||||
# via _default_spawn (start_new_session=True only detaches the
|
reap_worker_zombies()
|
||||||
# controlling tty, not the parent). Without an explicit waitpid, each
|
|
||||||
# completed worker becomes a <defunct> entry that lingers until gateway
|
|
||||||
# exit. WNOHANG keeps this non-blocking; ChildProcessError means no
|
|
||||||
# children to reap. Bounded: at most one tick's worth of completions
|
|
||||||
# can be in <defunct> at once.
|
|
||||||
#
|
|
||||||
# We also record the exit status keyed by pid, so
|
|
||||||
# ``detect_crashed_workers`` can distinguish a worker that exited
|
|
||||||
# cleanly without calling ``kanban_complete`` / ``kanban_block``
|
|
||||||
# (protocol violation — auto-block) from a real crash (OOM killer,
|
|
||||||
# SIGKILL, non-zero exit — existing counter behavior).
|
|
||||||
#
|
|
||||||
# Windows has no zombies / no os.WNOHANG — subprocess.Popen handles
|
|
||||||
# are freed when the Python object is garbage-collected or .wait() is
|
|
||||||
# called explicitly. The kanban dispatcher discards the Popen handle
|
|
||||||
# after spawn (``_default_spawn`` → abandon), so on Windows there's
|
|
||||||
# nothing to reap here — skip the whole block.
|
|
||||||
if os.name != "nt":
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
_pid, _status = os.waitpid(-1, os.WNOHANG)
|
|
||||||
except ChildProcessError:
|
|
||||||
break
|
|
||||||
if _pid == 0:
|
|
||||||
break
|
|
||||||
_record_worker_exit(_pid, _status)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
result = DispatchResult()
|
result = DispatchResult()
|
||||||
result.reclaimed = release_stale_claims(conn)
|
result.reclaimed = release_stale_claims(conn)
|
||||||
|
|
|
||||||
|
|
@ -71,6 +71,7 @@ AUTHOR_MAP = {
|
||||||
"schepers.zander1@gmail.com": "Strontvod",
|
"schepers.zander1@gmail.com": "Strontvod",
|
||||||
"ed@bebop.crew": "someaka",
|
"ed@bebop.crew": "someaka",
|
||||||
"anadi.jaggia@gmail.com": "Jaggia",
|
"anadi.jaggia@gmail.com": "Jaggia",
|
||||||
|
"steve@steveonjava.com": "steveonjava",
|
||||||
"32201324+simpolism@users.noreply.github.com": "simpolism",
|
"32201324+simpolism@users.noreply.github.com": "simpolism",
|
||||||
"simpolism@gmail.com": "simpolism",
|
"simpolism@gmail.com": "simpolism",
|
||||||
"jake@nousresearch.com": "simpolism",
|
"jake@nousresearch.com": "simpolism",
|
||||||
|
|
|
||||||
|
|
@ -3652,3 +3652,156 @@ def test_write_txn_check_reads_correct_header_fields(tmp_path):
|
||||||
_check_file_length_invariant(raw_conn)
|
_check_file_length_invariant(raw_conn)
|
||||||
raw_conn.close()
|
raw_conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# reap_worker_zombies() tests
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_reap_worker_zombies_returns_count():
|
||||||
|
"""reap_worker_zombies() returns the list of reaped PIDs."""
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
fake_pids = [12345, 67890, 11111]
|
||||||
|
call_count = [0]
|
||||||
|
|
||||||
|
def fake_waitpid(pid, flags):
|
||||||
|
if call_count[0] < len(fake_pids):
|
||||||
|
p = fake_pids[call_count[0]]
|
||||||
|
call_count[0] += 1
|
||||||
|
return p, 0
|
||||||
|
return 0, 0
|
||||||
|
|
||||||
|
with patch("hermes_cli.kanban_db.os.waitpid", side_effect=fake_waitpid):
|
||||||
|
with patch("hermes_cli.kanban_db._record_worker_exit"):
|
||||||
|
pids = kb.reap_worker_zombies()
|
||||||
|
assert pids == [12345, 67890, 11111]
|
||||||
|
|
||||||
|
|
||||||
|
def test_reap_worker_zombies_noop_on_windows(monkeypatch):
|
||||||
|
"""reap_worker_zombies() returns 0 and never calls os.waitpid on Windows."""
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
monkeypatch.setattr("hermes_cli.kanban_db.os.name", "nt")
|
||||||
|
with patch("hermes_cli.kanban_db.os.waitpid") as mock_waitpid:
|
||||||
|
result = kb.reap_worker_zombies()
|
||||||
|
mock_waitpid.assert_not_called()
|
||||||
|
assert result == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_reap_worker_zombies_noop_no_children():
|
||||||
|
"""reap_worker_zombies() returns 0 without error when there are no children."""
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
with patch("hermes_cli.kanban_db.os.waitpid", side_effect=ChildProcessError):
|
||||||
|
result = kb.reap_worker_zombies()
|
||||||
|
assert result == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_reap_worker_zombies_records_exit_status():
|
||||||
|
"""reap_worker_zombies() calls _record_worker_exit for each reaped pid."""
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
calls = []
|
||||||
|
call_count = [0]
|
||||||
|
|
||||||
|
def fake_waitpid(pid, flags):
|
||||||
|
call_count[0] += 1
|
||||||
|
if call_count[0] == 1:
|
||||||
|
return 12345, 0
|
||||||
|
return 0, 0
|
||||||
|
|
||||||
|
with patch("hermes_cli.kanban_db.os.waitpid", side_effect=fake_waitpid):
|
||||||
|
with patch(
|
||||||
|
"hermes_cli.kanban_db._record_worker_exit",
|
||||||
|
side_effect=lambda p, s: calls.append((p, s)),
|
||||||
|
):
|
||||||
|
kb.reap_worker_zombies()
|
||||||
|
|
||||||
|
assert calls == [(12345, 0)]
|
||||||
|
|
||||||
|
|
||||||
|
def test_reap_worker_zombies_handles_waitpid_os_error():
|
||||||
|
"""reap_worker_zombies() does not propagate generic OSError from os.waitpid."""
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
with patch("hermes_cli.kanban_db.os.waitpid", side_effect=OSError("test error")):
|
||||||
|
result = kb.reap_worker_zombies()
|
||||||
|
assert result == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_zombie_reaper_runs_despite_board_connect_failure():
|
||||||
|
"""reap_worker_zombies runs even when a board tick raises an error."""
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
call_count = [0]
|
||||||
|
|
||||||
|
def fake_waitpid(pid, flags):
|
||||||
|
call_count[0] += 1
|
||||||
|
if call_count[0] <= 2:
|
||||||
|
return [12345, 67890][call_count[0] - 1], 0
|
||||||
|
return 0, 0
|
||||||
|
|
||||||
|
with patch("hermes_cli.kanban_db.os.waitpid", side_effect=fake_waitpid):
|
||||||
|
with patch("hermes_cli.kanban_db._record_worker_exit"):
|
||||||
|
# Simulate a board tick failure before reaping
|
||||||
|
try:
|
||||||
|
raise sqlite3.OperationalError("disk I/O error")
|
||||||
|
except sqlite3.OperationalError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Reaper still runs independently
|
||||||
|
pids = kb.reap_worker_zombies()
|
||||||
|
|
||||||
|
assert pids == [12345, 67890]
|
||||||
|
|
||||||
|
|
||||||
|
def test_zombie_reaper_survives_all_boards_failing():
|
||||||
|
"""reap_worker_zombies runs each tick regardless of board tick failures."""
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
total_reaped = 0
|
||||||
|
|
||||||
|
def make_fake_waitpid(zombie_pids):
|
||||||
|
call_count = [0]
|
||||||
|
|
||||||
|
def fake_waitpid(pid, flags):
|
||||||
|
if call_count[0] < len(zombie_pids):
|
||||||
|
p = zombie_pids[call_count[0]]
|
||||||
|
call_count[0] += 1
|
||||||
|
return p, 0
|
||||||
|
return 0, 0
|
||||||
|
|
||||||
|
return fake_waitpid
|
||||||
|
|
||||||
|
# 5 ticks, 2 zombies per tick = 10 total
|
||||||
|
for tick in range(5):
|
||||||
|
pids = [tick * 100 + 1, tick * 100 + 2]
|
||||||
|
with patch(
|
||||||
|
"hermes_cli.kanban_db.os.waitpid", side_effect=make_fake_waitpid(pids)
|
||||||
|
):
|
||||||
|
with patch("hermes_cli.kanban_db._record_worker_exit"):
|
||||||
|
pids = kb.reap_worker_zombies()
|
||||||
|
total_reaped += len(pids)
|
||||||
|
|
||||||
|
assert total_reaped == 10
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_once_still_reaps_via_extracted_fn(kanban_home):
|
||||||
|
"""The reaper inside dispatch_once still works after refactor to reap_worker_zombies()."""
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
call_count = [0]
|
||||||
|
|
||||||
|
def fake_waitpid(pid, flags):
|
||||||
|
call_count[0] += 1
|
||||||
|
if call_count[0] == 1:
|
||||||
|
return 99999, 0
|
||||||
|
return 0, 0
|
||||||
|
|
||||||
|
with patch("hermes_cli.kanban_db.os.waitpid", side_effect=fake_waitpid):
|
||||||
|
with patch("hermes_cli.kanban_db._record_worker_exit"):
|
||||||
|
with patch("hermes_cli.kanban_db.os.name", "posix"):
|
||||||
|
pids = kb.reap_worker_zombies()
|
||||||
|
|
||||||
|
assert pids == [99999]
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue