mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-19 04:52:06 +00:00
fix(kanban): gate claim + unblock on parent completion
Enforce the parent-completion invariant at claim_task (the single ready->running chokepoint) and re-gate unblock_task so blocked->ready only fires when parents are done. Prevents child tasks from running ahead of in-progress parents under the create-then-link race. Also adds a stress test that races concurrent create+link against hammered claim_task and asserts no child runs while any parent is undone. Ref: kanban/boards/cookai/workspaces/t_a6acd07d/root-cause.md Refs: t_8d6af9d6
This commit is contained in:
parent
79694018f8
commit
cda20eec0c
3 changed files with 344 additions and 4 deletions
|
|
@ -1804,6 +1804,31 @@ def claim_task(
|
|||
lock = claimer or _claimer_id()
|
||||
expires = now + int(ttl_seconds)
|
||||
with write_txn(conn):
|
||||
# Structural invariant: never transition ready -> running while any
|
||||
# parent is not yet 'done'. This is the single enforcement point
|
||||
# regardless of which writer (create_task, link_tasks, unblock_task,
|
||||
# release_stale_claims, manual SQL) set status='ready'. If a racy
|
||||
# writer promoted a task with undone parents, demote it back to
|
||||
# 'todo' here — recompute_ready will re-promote when the parents
|
||||
# actually finish. See RCA at
|
||||
# kanban/boards/cookai/workspaces/t_a6acd07d/root-cause.md.
|
||||
undone = conn.execute(
|
||||
"SELECT 1 FROM task_links l "
|
||||
"JOIN tasks p ON p.id = l.parent_id "
|
||||
"WHERE l.child_id = ? AND p.status != 'done' LIMIT 1",
|
||||
(task_id,),
|
||||
).fetchone()
|
||||
if undone:
|
||||
conn.execute(
|
||||
"UPDATE tasks SET status = 'todo' "
|
||||
"WHERE id = ? AND status = 'ready'",
|
||||
(task_id,),
|
||||
)
|
||||
_append_event(
|
||||
conn, task_id, "claim_rejected",
|
||||
{"reason": "parents_not_done"},
|
||||
)
|
||||
return None
|
||||
# Defensive: if a prior run somehow leaked (invariant violation from
|
||||
# an unknown code path), close it as 'reclaimed' so we don't strand
|
||||
# it when the CAS resets the pointer below. No-op when the invariant
|
||||
|
|
@ -2503,14 +2528,30 @@ def unblock_task(conn: sqlite3.Connection, task_id: str) -> bool:
|
|||
""",
|
||||
(now, int(stale["current_run_id"])),
|
||||
)
|
||||
cur = conn.execute(
|
||||
"UPDATE tasks SET status = 'ready', current_run_id = NULL "
|
||||
"WHERE id = ? AND status = 'blocked'",
|
||||
# Re-gate on parent completion before flipping 'blocked' back to
|
||||
# 'ready'. Unconditionally setting status='ready' here bypasses the
|
||||
# parent-completion invariant (the dispatcher trusts that column);
|
||||
# if parents are still in progress the task must wait in 'todo'
|
||||
# until recompute_ready picks it up. RCA: Bug 2 at
|
||||
# kanban/boards/cookai/workspaces/t_a6acd07d/root-cause.md.
|
||||
undone_parents = conn.execute(
|
||||
"SELECT 1 FROM task_links l "
|
||||
"JOIN tasks p ON p.id = l.parent_id "
|
||||
"WHERE l.child_id = ? AND p.status != 'done' LIMIT 1",
|
||||
(task_id,),
|
||||
).fetchone()
|
||||
new_status = "todo" if undone_parents else "ready"
|
||||
cur = conn.execute(
|
||||
"UPDATE tasks SET status = ?, current_run_id = NULL "
|
||||
"WHERE id = ? AND status = 'blocked'",
|
||||
(new_status, task_id),
|
||||
)
|
||||
if cur.rowcount != 1:
|
||||
return False
|
||||
_append_event(conn, task_id, "unblocked", None)
|
||||
_append_event(
|
||||
conn, task_id, "unblocked",
|
||||
{"status": new_status} if new_status != "ready" else None,
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -298,6 +298,122 @@ def test_block_then_unblock(kanban_home):
|
|||
assert kb.get_task(conn, t).status == "ready"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Parent-completion invariant at the claim gate (RCA t_a6acd07d)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_claim_rejects_when_parents_not_done(kanban_home):
|
||||
"""claim_task must refuse ready->running if any parent isn't 'done'.
|
||||
|
||||
Simulates the create-then-link race: a task gets status='ready' via a
|
||||
racy writer while it still has undone parents. The claim gate must
|
||||
detect the violation, demote the child back to 'todo', append a
|
||||
'claim_rejected' event, and return None. Covers Fix 1 of the RCA.
|
||||
"""
|
||||
with kb.connect() as conn:
|
||||
parent = kb.create_task(conn, title="parent", assignee="a")
|
||||
child = kb.create_task(
|
||||
conn, title="child", assignee="a", parents=[parent],
|
||||
)
|
||||
# Child correctly starts 'todo' because parent is not 'done'.
|
||||
assert kb.get_task(conn, child).status == "todo"
|
||||
# Simulate the race: a racy writer force-promotes the child to
|
||||
# 'ready' while parent is still pending.
|
||||
conn.execute(
|
||||
"UPDATE tasks SET status='ready' WHERE id=?", (child,),
|
||||
)
|
||||
conn.commit()
|
||||
assert kb.get_task(conn, child).status == "ready"
|
||||
|
||||
result = kb.claim_task(conn, child, claimer="host:1")
|
||||
|
||||
assert result is None
|
||||
with kb.connect() as conn:
|
||||
assert kb.get_task(conn, child).status == "todo"
|
||||
events = conn.execute(
|
||||
"SELECT kind, payload FROM task_events "
|
||||
"WHERE task_id = ? ORDER BY id",
|
||||
(child,),
|
||||
).fetchall()
|
||||
kinds = [e["kind"] for e in events]
|
||||
assert "claim_rejected" in kinds
|
||||
# No 'claimed' event was emitted for the blocked attempt.
|
||||
assert "claimed" not in kinds
|
||||
|
||||
|
||||
def test_claim_succeeds_once_parents_done(kanban_home):
|
||||
"""After parents complete, recompute_ready -> claim_task must succeed."""
|
||||
with kb.connect() as conn:
|
||||
parent = kb.create_task(conn, title="parent", assignee="a")
|
||||
child = kb.create_task(
|
||||
conn, title="child", assignee="a", parents=[parent],
|
||||
)
|
||||
kb.claim_task(conn, parent)
|
||||
assert kb.complete_task(conn, parent, result="ok")
|
||||
kb.recompute_ready(conn)
|
||||
assert kb.get_task(conn, child).status == "ready"
|
||||
claimed = kb.claim_task(conn, child, claimer="host:1")
|
||||
assert claimed is not None
|
||||
assert claimed.status == "running"
|
||||
|
||||
|
||||
def test_create_with_parents_stays_todo_until_parents_done(kanban_home):
|
||||
"""kanban_create(parents=[...]) must land in 'todo' and only promote on parent done."""
|
||||
with kb.connect() as conn:
|
||||
parent = kb.create_task(conn, title="parent", assignee="a")
|
||||
child = kb.create_task(
|
||||
conn, title="child", assignee="a", parents=[parent],
|
||||
)
|
||||
assert kb.get_task(conn, child).status == "todo"
|
||||
# Dispatcher tick between create and some later event must NOT
|
||||
# produce a winner for this child.
|
||||
promoted = kb.recompute_ready(conn)
|
||||
assert promoted == 0
|
||||
assert kb.get_task(conn, child).status == "todo"
|
||||
# Complete parent; complete_task internally runs recompute_ready,
|
||||
# which promotes the child to 'ready'.
|
||||
kb.claim_task(conn, parent)
|
||||
kb.complete_task(conn, parent, result="ok")
|
||||
assert kb.get_task(conn, child).status == "ready"
|
||||
|
||||
|
||||
def test_unblock_with_pending_parents_goes_to_todo(kanban_home):
|
||||
"""unblock_task must re-gate on parent completion (Fix 3).
|
||||
|
||||
A task blocked while parents are still in progress must return to
|
||||
'todo' (not 'ready') on unblock. Otherwise the dispatcher will claim
|
||||
it immediately, repeating Bug 2 from the RCA.
|
||||
"""
|
||||
with kb.connect() as conn:
|
||||
parent = kb.create_task(conn, title="parent", assignee="a")
|
||||
child = kb.create_task(
|
||||
conn, title="child", assignee="a", parents=[parent],
|
||||
)
|
||||
# Force child into 'blocked' regardless of parent progress
|
||||
# (simulates a worker that self-blocked, or an operator block).
|
||||
conn.execute(
|
||||
"UPDATE tasks SET status='blocked' WHERE id=?", (child,),
|
||||
)
|
||||
conn.commit()
|
||||
assert kb.unblock_task(conn, child)
|
||||
assert kb.get_task(conn, child).status == "todo"
|
||||
# After parent completes + recompute, the child is ready.
|
||||
kb.claim_task(conn, parent)
|
||||
kb.complete_task(conn, parent, result="ok")
|
||||
kb.recompute_ready(conn)
|
||||
assert kb.get_task(conn, child).status == "ready"
|
||||
|
||||
|
||||
def test_unblock_without_parents_goes_to_ready(kanban_home):
|
||||
"""Parent-free unblock still produces 'ready' (behavior preserved)."""
|
||||
with kb.connect() as conn:
|
||||
t = kb.create_task(conn, title="lone", assignee="a")
|
||||
kb.claim_task(conn, t)
|
||||
assert kb.block_task(conn, t, reason="need input")
|
||||
assert kb.unblock_task(conn, t)
|
||||
assert kb.get_task(conn, t).status == "ready"
|
||||
|
||||
|
||||
def test_assign_refuses_while_running(kanban_home):
|
||||
with kb.connect() as conn:
|
||||
t = kb.create_task(conn, title="x", assignee="a")
|
||||
|
|
|
|||
183
tests/stress/test_concurrency_parent_gate.py
Normal file
183
tests/stress/test_concurrency_parent_gate.py
Normal file
|
|
@ -0,0 +1,183 @@
|
|||
"""Stress test for parent-completion invariant at the claim gate.
|
||||
|
||||
Simulates the create-then-link race described in RCA t_a6acd07d:
|
||||
|
||||
Thread A: repeatedly inserts a child row with status='ready' (racy
|
||||
writer) and a split-second-later inserts the parent link,
|
||||
emulating the pre-fix _kanban_create path.
|
||||
Thread B: repeatedly runs claim_task against every ready task.
|
||||
|
||||
Pass criteria: no task is ever 'claimed' while any of its parents is
|
||||
not 'done'. The claim_task gate added in hermes_cli/kanban_db.py must
|
||||
demote such tasks back to 'todo' and emit a 'claim_rejected' event
|
||||
instead of spawning.
|
||||
|
||||
Run as a script (`python tests/stress/test_concurrency_parent_gate.py`)
|
||||
or via `pytest --run-stress`. The default pytest collection in
|
||||
tests/stress/conftest.py ignores *.py globs, so this is a script.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import random
|
||||
import sys
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
WT = str(Path(__file__).resolve().parents[2])
|
||||
sys.path.insert(0, WT)
|
||||
|
||||
NUM_CREATE_ROUNDS = 200
|
||||
WORKERS_RUN_DURATION_S = 8
|
||||
|
||||
|
||||
def run() -> int:
|
||||
home = tempfile.mkdtemp(prefix="hermes_parent_gate_stress_")
|
||||
os.environ["HERMES_HOME"] = home
|
||||
os.environ["HOME"] = home
|
||||
|
||||
from hermes_cli import kanban_db as kb
|
||||
|
||||
kb.init_db()
|
||||
|
||||
# Seed N parents in 'ready' state. They stay ready for the whole run
|
||||
# (never 'done'), so every child linked to one of them must remain
|
||||
# unclaimable.
|
||||
parent_ids: list[str] = []
|
||||
conn = kb.connect()
|
||||
try:
|
||||
for i in range(10):
|
||||
parent_ids.append(
|
||||
kb.create_task(conn, title=f"parent-{i}", assignee="a")
|
||||
)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
created_children: list[str] = []
|
||||
created_lock = threading.Lock()
|
||||
stop = threading.Event()
|
||||
violations: list[str] = []
|
||||
|
||||
def racy_creator() -> None:
|
||||
"""Inserts child rows with status='ready' and links them after.
|
||||
|
||||
This is the pre-fix _kanban_create behavior — the very race
|
||||
the gate in claim_task must catch.
|
||||
"""
|
||||
conn = kb.connect()
|
||||
try:
|
||||
for _ in range(NUM_CREATE_ROUNDS):
|
||||
if stop.is_set():
|
||||
return
|
||||
parents = random.sample(parent_ids, k=2)
|
||||
# Step 1: insert child WITHOUT parents (ends up ready).
|
||||
child = kb.create_task(
|
||||
conn, title="child", assignee="a", parents=[],
|
||||
)
|
||||
# Tiny delay so worker threads get a chance to see the
|
||||
# ready row before the links are inserted.
|
||||
time.sleep(random.uniform(0.0001, 0.002))
|
||||
# Step 2: add the parent links after the fact.
|
||||
for p in parents:
|
||||
try:
|
||||
kb.link_tasks(conn, parent_id=p, child_id=child)
|
||||
except Exception:
|
||||
pass
|
||||
with created_lock:
|
||||
created_children.append(child)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def worker_loop() -> None:
|
||||
conn = kb.connect()
|
||||
try:
|
||||
end = time.monotonic() + WORKERS_RUN_DURATION_S
|
||||
while time.monotonic() < end and not stop.is_set():
|
||||
row = conn.execute(
|
||||
"SELECT id FROM tasks WHERE status='ready' "
|
||||
"AND claim_lock IS NULL ORDER BY RANDOM() LIMIT 1"
|
||||
).fetchone()
|
||||
if row is None:
|
||||
time.sleep(0.002)
|
||||
continue
|
||||
tid = row["id"]
|
||||
try:
|
||||
claimed = kb.claim_task(conn, tid, claimer="w")
|
||||
except Exception:
|
||||
continue
|
||||
if claimed is None:
|
||||
continue
|
||||
# Invariant: a successful claim on `tid` must mean all
|
||||
# parents are 'done'. Check in the same connection txn
|
||||
# so we see the post-claim state.
|
||||
undone = conn.execute(
|
||||
"SELECT l.parent_id, p.status FROM task_links l "
|
||||
"JOIN tasks p ON p.id = l.parent_id "
|
||||
"WHERE l.child_id = ? AND p.status != 'done'",
|
||||
(tid,),
|
||||
).fetchall()
|
||||
if undone:
|
||||
violations.append(
|
||||
f"claimed {tid} while parents not done: "
|
||||
+ ",".join(f"{r['parent_id']}={r['status']}" for r in undone)
|
||||
)
|
||||
# Release so the run doesn't leak and the next round sees ready.
|
||||
kb.complete_task(conn, tid, result="stress-ok")
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
creator = threading.Thread(target=racy_creator, daemon=True)
|
||||
workers = [threading.Thread(target=worker_loop, daemon=True)
|
||||
for _ in range(4)]
|
||||
creator.start()
|
||||
for w in workers:
|
||||
w.start()
|
||||
creator.join()
|
||||
# Give the workers a chance to fully drain ready rows before we stop.
|
||||
time.sleep(0.5)
|
||||
stop.set()
|
||||
for w in workers:
|
||||
w.join(timeout=WORKERS_RUN_DURATION_S + 2)
|
||||
|
||||
# Post-run audit: the DB event log must show no 'claimed' event on any
|
||||
# task whose parents were not 'done' at the time of the claim.
|
||||
conn = kb.connect()
|
||||
try:
|
||||
bad = conn.execute(
|
||||
"""
|
||||
WITH claims AS (
|
||||
SELECT task_id, created_at AS t
|
||||
FROM task_events WHERE kind='claimed'
|
||||
)
|
||||
SELECT c.task_id, l.parent_id, p.status, p.completed_at
|
||||
FROM claims c
|
||||
JOIN task_links l ON l.child_id = c.task_id
|
||||
JOIN tasks p ON p.id = l.parent_id
|
||||
WHERE p.completed_at IS NULL OR p.completed_at > c.t
|
||||
"""
|
||||
).fetchall()
|
||||
rejections = conn.execute(
|
||||
"SELECT COUNT(*) FROM task_events WHERE kind='claim_rejected'"
|
||||
).fetchone()[0]
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
print(f"children created: {len(created_children)}")
|
||||
print(f"violations: {len(violations)}")
|
||||
print(f"event-log bad: {len(bad)}")
|
||||
print(f"claim_rejected: {rejections}")
|
||||
|
||||
if violations or bad:
|
||||
for v in violations[:10]:
|
||||
print(" VIOLATION:", v)
|
||||
for row in list(bad)[:10]:
|
||||
print(" EVENT-LOG BAD:", dict(row))
|
||||
return 1
|
||||
print("PARENT-GATE INVARIANT HELD UNDER RACE")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(run())
|
||||
Loading…
Add table
Add a link
Reference in a new issue