diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index ff2e1cb254b..519517773f1 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -1804,6 +1804,31 @@ def claim_task( lock = claimer or _claimer_id() expires = now + int(ttl_seconds) with write_txn(conn): + # Structural invariant: never transition ready -> running while any + # parent is not yet 'done'. This is the single enforcement point + # regardless of which writer (create_task, link_tasks, unblock_task, + # release_stale_claims, manual SQL) set status='ready'. If a racy + # writer promoted a task with undone parents, demote it back to + # 'todo' here — recompute_ready will re-promote when the parents + # actually finish. See RCA at + # kanban/boards/cookai/workspaces/t_a6acd07d/root-cause.md. + undone = conn.execute( + "SELECT 1 FROM task_links l " + "JOIN tasks p ON p.id = l.parent_id " + "WHERE l.child_id = ? AND p.status != 'done' LIMIT 1", + (task_id,), + ).fetchone() + if undone: + conn.execute( + "UPDATE tasks SET status = 'todo' " + "WHERE id = ? AND status = 'ready'", + (task_id,), + ) + _append_event( + conn, task_id, "claim_rejected", + {"reason": "parents_not_done"}, + ) + return None # Defensive: if a prior run somehow leaked (invariant violation from # an unknown code path), close it as 'reclaimed' so we don't strand # it when the CAS resets the pointer below. No-op when the invariant @@ -2503,14 +2528,30 @@ def unblock_task(conn: sqlite3.Connection, task_id: str) -> bool: """, (now, int(stale["current_run_id"])), ) - cur = conn.execute( - "UPDATE tasks SET status = 'ready', current_run_id = NULL " - "WHERE id = ? AND status = 'blocked'", + # Re-gate on parent completion before flipping 'blocked' back to + # 'ready'. Unconditionally setting status='ready' here bypasses the + # parent-completion invariant (the dispatcher trusts that column); + # if parents are still in progress the task must wait in 'todo' + # until recompute_ready picks it up. RCA: Bug 2 at + # kanban/boards/cookai/workspaces/t_a6acd07d/root-cause.md. + undone_parents = conn.execute( + "SELECT 1 FROM task_links l " + "JOIN tasks p ON p.id = l.parent_id " + "WHERE l.child_id = ? AND p.status != 'done' LIMIT 1", (task_id,), + ).fetchone() + new_status = "todo" if undone_parents else "ready" + cur = conn.execute( + "UPDATE tasks SET status = ?, current_run_id = NULL " + "WHERE id = ? AND status = 'blocked'", + (new_status, task_id), ) if cur.rowcount != 1: return False - _append_event(conn, task_id, "unblocked", None) + _append_event( + conn, task_id, "unblocked", + {"status": new_status} if new_status != "ready" else None, + ) return True diff --git a/tests/hermes_cli/test_kanban_db.py b/tests/hermes_cli/test_kanban_db.py index 324782dad66..af9fb1da43c 100644 --- a/tests/hermes_cli/test_kanban_db.py +++ b/tests/hermes_cli/test_kanban_db.py @@ -298,6 +298,122 @@ def test_block_then_unblock(kanban_home): assert kb.get_task(conn, t).status == "ready" +# --------------------------------------------------------------------------- +# Parent-completion invariant at the claim gate (RCA t_a6acd07d) +# --------------------------------------------------------------------------- + +def test_claim_rejects_when_parents_not_done(kanban_home): + """claim_task must refuse ready->running if any parent isn't 'done'. + + Simulates the create-then-link race: a task gets status='ready' via a + racy writer while it still has undone parents. The claim gate must + detect the violation, demote the child back to 'todo', append a + 'claim_rejected' event, and return None. Covers Fix 1 of the RCA. + """ + with kb.connect() as conn: + parent = kb.create_task(conn, title="parent", assignee="a") + child = kb.create_task( + conn, title="child", assignee="a", parents=[parent], + ) + # Child correctly starts 'todo' because parent is not 'done'. + assert kb.get_task(conn, child).status == "todo" + # Simulate the race: a racy writer force-promotes the child to + # 'ready' while parent is still pending. + conn.execute( + "UPDATE tasks SET status='ready' WHERE id=?", (child,), + ) + conn.commit() + assert kb.get_task(conn, child).status == "ready" + + result = kb.claim_task(conn, child, claimer="host:1") + + assert result is None + with kb.connect() as conn: + assert kb.get_task(conn, child).status == "todo" + events = conn.execute( + "SELECT kind, payload FROM task_events " + "WHERE task_id = ? ORDER BY id", + (child,), + ).fetchall() + kinds = [e["kind"] for e in events] + assert "claim_rejected" in kinds + # No 'claimed' event was emitted for the blocked attempt. + assert "claimed" not in kinds + + +def test_claim_succeeds_once_parents_done(kanban_home): + """After parents complete, recompute_ready -> claim_task must succeed.""" + with kb.connect() as conn: + parent = kb.create_task(conn, title="parent", assignee="a") + child = kb.create_task( + conn, title="child", assignee="a", parents=[parent], + ) + kb.claim_task(conn, parent) + assert kb.complete_task(conn, parent, result="ok") + kb.recompute_ready(conn) + assert kb.get_task(conn, child).status == "ready" + claimed = kb.claim_task(conn, child, claimer="host:1") + assert claimed is not None + assert claimed.status == "running" + + +def test_create_with_parents_stays_todo_until_parents_done(kanban_home): + """kanban_create(parents=[...]) must land in 'todo' and only promote on parent done.""" + with kb.connect() as conn: + parent = kb.create_task(conn, title="parent", assignee="a") + child = kb.create_task( + conn, title="child", assignee="a", parents=[parent], + ) + assert kb.get_task(conn, child).status == "todo" + # Dispatcher tick between create and some later event must NOT + # produce a winner for this child. + promoted = kb.recompute_ready(conn) + assert promoted == 0 + assert kb.get_task(conn, child).status == "todo" + # Complete parent; complete_task internally runs recompute_ready, + # which promotes the child to 'ready'. + kb.claim_task(conn, parent) + kb.complete_task(conn, parent, result="ok") + assert kb.get_task(conn, child).status == "ready" + + +def test_unblock_with_pending_parents_goes_to_todo(kanban_home): + """unblock_task must re-gate on parent completion (Fix 3). + + A task blocked while parents are still in progress must return to + 'todo' (not 'ready') on unblock. Otherwise the dispatcher will claim + it immediately, repeating Bug 2 from the RCA. + """ + with kb.connect() as conn: + parent = kb.create_task(conn, title="parent", assignee="a") + child = kb.create_task( + conn, title="child", assignee="a", parents=[parent], + ) + # Force child into 'blocked' regardless of parent progress + # (simulates a worker that self-blocked, or an operator block). + conn.execute( + "UPDATE tasks SET status='blocked' WHERE id=?", (child,), + ) + conn.commit() + assert kb.unblock_task(conn, child) + assert kb.get_task(conn, child).status == "todo" + # After parent completes + recompute, the child is ready. + kb.claim_task(conn, parent) + kb.complete_task(conn, parent, result="ok") + kb.recompute_ready(conn) + assert kb.get_task(conn, child).status == "ready" + + +def test_unblock_without_parents_goes_to_ready(kanban_home): + """Parent-free unblock still produces 'ready' (behavior preserved).""" + with kb.connect() as conn: + t = kb.create_task(conn, title="lone", assignee="a") + kb.claim_task(conn, t) + assert kb.block_task(conn, t, reason="need input") + assert kb.unblock_task(conn, t) + assert kb.get_task(conn, t).status == "ready" + + def test_assign_refuses_while_running(kanban_home): with kb.connect() as conn: t = kb.create_task(conn, title="x", assignee="a") diff --git a/tests/stress/test_concurrency_parent_gate.py b/tests/stress/test_concurrency_parent_gate.py new file mode 100644 index 00000000000..406774bad5b --- /dev/null +++ b/tests/stress/test_concurrency_parent_gate.py @@ -0,0 +1,183 @@ +"""Stress test for parent-completion invariant at the claim gate. + +Simulates the create-then-link race described in RCA t_a6acd07d: + + Thread A: repeatedly inserts a child row with status='ready' (racy + writer) and a split-second-later inserts the parent link, + emulating the pre-fix _kanban_create path. + Thread B: repeatedly runs claim_task against every ready task. + +Pass criteria: no task is ever 'claimed' while any of its parents is +not 'done'. The claim_task gate added in hermes_cli/kanban_db.py must +demote such tasks back to 'todo' and emit a 'claim_rejected' event +instead of spawning. + +Run as a script (`python tests/stress/test_concurrency_parent_gate.py`) +or via `pytest --run-stress`. The default pytest collection in +tests/stress/conftest.py ignores *.py globs, so this is a script. +""" +from __future__ import annotations + +import os +import random +import sys +import tempfile +import threading +import time +from pathlib import Path + +WT = str(Path(__file__).resolve().parents[2]) +sys.path.insert(0, WT) + +NUM_CREATE_ROUNDS = 200 +WORKERS_RUN_DURATION_S = 8 + + +def run() -> int: + home = tempfile.mkdtemp(prefix="hermes_parent_gate_stress_") + os.environ["HERMES_HOME"] = home + os.environ["HOME"] = home + + from hermes_cli import kanban_db as kb + + kb.init_db() + + # Seed N parents in 'ready' state. They stay ready for the whole run + # (never 'done'), so every child linked to one of them must remain + # unclaimable. + parent_ids: list[str] = [] + conn = kb.connect() + try: + for i in range(10): + parent_ids.append( + kb.create_task(conn, title=f"parent-{i}", assignee="a") + ) + finally: + conn.close() + + created_children: list[str] = [] + created_lock = threading.Lock() + stop = threading.Event() + violations: list[str] = [] + + def racy_creator() -> None: + """Inserts child rows with status='ready' and links them after. + + This is the pre-fix _kanban_create behavior — the very race + the gate in claim_task must catch. + """ + conn = kb.connect() + try: + for _ in range(NUM_CREATE_ROUNDS): + if stop.is_set(): + return + parents = random.sample(parent_ids, k=2) + # Step 1: insert child WITHOUT parents (ends up ready). + child = kb.create_task( + conn, title="child", assignee="a", parents=[], + ) + # Tiny delay so worker threads get a chance to see the + # ready row before the links are inserted. + time.sleep(random.uniform(0.0001, 0.002)) + # Step 2: add the parent links after the fact. + for p in parents: + try: + kb.link_tasks(conn, parent_id=p, child_id=child) + except Exception: + pass + with created_lock: + created_children.append(child) + finally: + conn.close() + + def worker_loop() -> None: + conn = kb.connect() + try: + end = time.monotonic() + WORKERS_RUN_DURATION_S + while time.monotonic() < end and not stop.is_set(): + row = conn.execute( + "SELECT id FROM tasks WHERE status='ready' " + "AND claim_lock IS NULL ORDER BY RANDOM() LIMIT 1" + ).fetchone() + if row is None: + time.sleep(0.002) + continue + tid = row["id"] + try: + claimed = kb.claim_task(conn, tid, claimer="w") + except Exception: + continue + if claimed is None: + continue + # Invariant: a successful claim on `tid` must mean all + # parents are 'done'. Check in the same connection txn + # so we see the post-claim state. + undone = conn.execute( + "SELECT l.parent_id, p.status FROM task_links l " + "JOIN tasks p ON p.id = l.parent_id " + "WHERE l.child_id = ? AND p.status != 'done'", + (tid,), + ).fetchall() + if undone: + violations.append( + f"claimed {tid} while parents not done: " + + ",".join(f"{r['parent_id']}={r['status']}" for r in undone) + ) + # Release so the run doesn't leak and the next round sees ready. + kb.complete_task(conn, tid, result="stress-ok") + finally: + conn.close() + + creator = threading.Thread(target=racy_creator, daemon=True) + workers = [threading.Thread(target=worker_loop, daemon=True) + for _ in range(4)] + creator.start() + for w in workers: + w.start() + creator.join() + # Give the workers a chance to fully drain ready rows before we stop. + time.sleep(0.5) + stop.set() + for w in workers: + w.join(timeout=WORKERS_RUN_DURATION_S + 2) + + # Post-run audit: the DB event log must show no 'claimed' event on any + # task whose parents were not 'done' at the time of the claim. + conn = kb.connect() + try: + bad = conn.execute( + """ + WITH claims AS ( + SELECT task_id, created_at AS t + FROM task_events WHERE kind='claimed' + ) + SELECT c.task_id, l.parent_id, p.status, p.completed_at + FROM claims c + JOIN task_links l ON l.child_id = c.task_id + JOIN tasks p ON p.id = l.parent_id + WHERE p.completed_at IS NULL OR p.completed_at > c.t + """ + ).fetchall() + rejections = conn.execute( + "SELECT COUNT(*) FROM task_events WHERE kind='claim_rejected'" + ).fetchone()[0] + finally: + conn.close() + + print(f"children created: {len(created_children)}") + print(f"violations: {len(violations)}") + print(f"event-log bad: {len(bad)}") + print(f"claim_rejected: {rejections}") + + if violations or bad: + for v in violations[:10]: + print(" VIOLATION:", v) + for row in list(bad)[:10]: + print(" EVENT-LOG BAD:", dict(row)) + return 1 + print("PARENT-GATE INVARIANT HELD UNDER RACE") + return 0 + + +if __name__ == "__main__": + sys.exit(run())