mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-31 06:51:29 +00:00
fix(kanban): promote blocked tasks when parent dependencies complete
recompute_ready only scanned 'todo' tasks for promotion, ignoring 'blocked' tasks entirely. When a task was blocked (e.g. by the circuit breaker) and its parent dependencies later completed, the task stayed stuck in 'blocked' forever unless manually unblocked. Now recompute_ready also scans 'blocked' tasks. When all parents are done/archived, the blocked task is promoted to 'ready' with failure counters reset — equivalent to an automatic unblock. Includes a regression test for the blocked-parent-done promotion path.
This commit is contained in:
parent
bc961c13f3
commit
40c1decb3b
2 changed files with 56 additions and 6 deletions
|
|
@ -1849,21 +1849,32 @@ def recompute_ready(conn: sqlite3.Connection) -> int:
|
||||||
promoted = 0
|
promoted = 0
|
||||||
with write_txn(conn):
|
with write_txn(conn):
|
||||||
todo_rows = conn.execute(
|
todo_rows = conn.execute(
|
||||||
"SELECT id FROM tasks WHERE status = 'todo'"
|
"SELECT id, status FROM tasks WHERE status IN ('todo', 'blocked')"
|
||||||
).fetchall()
|
).fetchall()
|
||||||
for row in todo_rows:
|
for row in todo_rows:
|
||||||
task_id = row["id"]
|
task_id = row["id"]
|
||||||
|
cur_status = row["status"]
|
||||||
parents = conn.execute(
|
parents = conn.execute(
|
||||||
"SELECT t.status FROM tasks t "
|
"SELECT t.status FROM tasks t "
|
||||||
"JOIN task_links l ON l.parent_id = t.id "
|
"JOIN task_links l ON l.parent_id = t.id "
|
||||||
"WHERE l.child_id = ?",
|
"WHERE l.child_id = ?",
|
||||||
(task_id,),
|
(task_id,),
|
||||||
).fetchall()
|
).fetchall()
|
||||||
if all(p["status"] in {"done", "archived"} for p in parents):
|
if all(p["status"] in ("done", "archived") for p in parents):
|
||||||
conn.execute(
|
# Blocked tasks also get their failure counters reset —
|
||||||
"UPDATE tasks SET status = 'ready' WHERE id = ? AND status = 'todo'",
|
# this is effectively an auto-unblock.
|
||||||
(task_id,),
|
if cur_status == "blocked":
|
||||||
)
|
conn.execute(
|
||||||
|
"UPDATE tasks SET status = 'ready', "
|
||||||
|
"consecutive_failures = 0, last_failure_error = NULL "
|
||||||
|
"WHERE id = ? AND status = 'blocked'",
|
||||||
|
(task_id,),
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE tasks SET status = 'ready' WHERE id = ? AND status = 'todo'",
|
||||||
|
(task_id,),
|
||||||
|
)
|
||||||
_append_event(conn, task_id, "promoted", None)
|
_append_event(conn, task_id, "promoted", None)
|
||||||
promoted += 1
|
promoted += 1
|
||||||
return promoted
|
return promoted
|
||||||
|
|
@ -3554,6 +3565,17 @@ def set_max_runtime(
|
||||||
return cur.rowcount == 1
|
return cur.rowcount == 1
|
||||||
|
|
||||||
|
|
||||||
|
def _error_fingerprint(error_text: str) -> str:
|
||||||
|
"""Normalize an error message for grouping identical failures.
|
||||||
|
|
||||||
|
Strips host-specific details (PIDs, timestamps) so that errors
|
||||||
|
with the same root cause produce the same fingerprint.
|
||||||
|
"""
|
||||||
|
fp = re.sub(r'\bpid \d+\b', 'pid N', error_text[:80])
|
||||||
|
fp = re.sub(r'\b\d{10,}\b', '<TS>', fp)
|
||||||
|
return fp.lower().strip()
|
||||||
|
|
||||||
|
|
||||||
def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
|
def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
|
||||||
"""Reclaim ``running`` tasks whose worker PID is no longer alive.
|
"""Reclaim ``running`` tasks whose worker PID is no longer alive.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -134,6 +134,34 @@ def test_recompute_ready_cascades_through_chain(kanban_home):
|
||||||
assert kb.get_task(conn, c).status == "ready"
|
assert kb.get_task(conn, c).status == "ready"
|
||||||
|
|
||||||
|
|
||||||
|
def test_recompute_ready_promotes_blocked_with_done_parents(kanban_home):
|
||||||
|
"""blocked tasks with all parents done should be promoted to ready."""
|
||||||
|
with kb.connect() as conn:
|
||||||
|
parent = kb.create_task(conn, title="parent", assignee="a")
|
||||||
|
child = kb.create_task(
|
||||||
|
conn, title="child", assignee="a", parents=[parent],
|
||||||
|
)
|
||||||
|
# Complete the parent
|
||||||
|
kb.claim_task(conn, parent)
|
||||||
|
kb.complete_task(conn, parent, result="ok")
|
||||||
|
# Manually block the child (simulates a worker that failed
|
||||||
|
# after the parent finished)
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE tasks SET status='blocked', consecutive_failures=5, "
|
||||||
|
"last_failure_error='persistent error' WHERE id=?",
|
||||||
|
(child,),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
assert kb.get_task(conn, child).status == "blocked"
|
||||||
|
# recompute_ready should promote blocked → ready and reset failures
|
||||||
|
promoted = kb.recompute_ready(conn)
|
||||||
|
assert promoted == 1
|
||||||
|
task = kb.get_task(conn, child)
|
||||||
|
assert task.status == "ready"
|
||||||
|
assert task.consecutive_failures == 0
|
||||||
|
assert task.last_failure_error is None
|
||||||
|
|
||||||
|
|
||||||
def test_recompute_ready_fan_in_waits_for_all_parents(kanban_home):
|
def test_recompute_ready_fan_in_waits_for_all_parents(kanban_home):
|
||||||
with kb.connect() as conn:
|
with kb.connect() as conn:
|
||||||
a = kb.create_task(conn, title="a")
|
a = kb.create_task(conn, title="a")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue