guard kanban worker lifecycle by run id

This commit is contained in:
misery-hl 2026-05-04 09:39:47 -07:00 committed by Teknium
parent f0d278412f
commit 56b4795115
5 changed files with 243 additions and 36 deletions

View file

@ -943,7 +943,12 @@ def _cmd_init(args: argparse.Namespace) -> int:
def _cmd_heartbeat(args: argparse.Namespace) -> int:
with kb.connect() as conn:
ok = kb.heartbeat_worker(conn, args.task_id, note=getattr(args, "note", None))
ok = kb.heartbeat_worker(
conn,
args.task_id,
note=getattr(args, "note", None),
expected_run_id=_worker_run_id_for(args.task_id),
)
if not ok:
print(f"cannot heartbeat {args.task_id} (not running?)", file=sys.stderr)
return 1
@ -1406,6 +1411,18 @@ def _cmd_comment(args: argparse.Namespace) -> int:
return 0
def _worker_run_id_for(task_id: str) -> Optional[int]:
if os.environ.get("HERMES_KANBAN_TASK") != task_id:
return None
raw = os.environ.get("HERMES_KANBAN_RUN_ID")
if not raw:
return None
try:
return int(raw)
except ValueError:
return None
def _cmd_complete(args: argparse.Namespace) -> int:
"""Mark one or more tasks done. Supports a single id or a list."""
ids = list(args.task_ids or [])
@ -1442,6 +1459,7 @@ def _cmd_complete(args: argparse.Namespace) -> int:
result=args.result,
summary=summary,
metadata=metadata,
expected_run_id=_worker_run_id_for(tid),
):
failed.append(tid)
print(f"cannot complete {tid} (unknown id or terminal state)", file=sys.stderr)
@ -1487,7 +1505,12 @@ def _cmd_block(args: argparse.Namespace) -> int:
for tid in ids:
if reason:
kb.add_comment(conn, tid, author, f"BLOCKED: {reason}")
if not kb.block_task(conn, tid, reason=reason):
if not kb.block_task(
conn,
tid,
reason=reason,
expected_run_id=_worker_run_id_for(tid),
):
failed.append(tid)
print(f"cannot block {tid}", file=sys.stderr)
else:

View file

@ -2098,6 +2098,7 @@ def complete_task(
summary: Optional[str] = None,
metadata: Optional[dict] = None,
created_cards: Optional[Iterable[str]] = None,
expected_run_id: Optional[int] = None,
) -> bool:
"""Transition ``running|ready -> done`` and record ``result``.
@ -2157,20 +2158,37 @@ def complete_task(
verified_cards = []
with write_txn(conn):
cur = conn.execute(
"""
UPDATE tasks
SET status = 'done',
result = ?,
completed_at = ?,
claim_lock = NULL,
claim_expires= NULL,
worker_pid = NULL
WHERE id = ?
AND status IN ('running', 'ready', 'blocked')
""",
(result, now, task_id),
)
if expected_run_id is None:
cur = conn.execute(
"""
UPDATE tasks
SET status = 'done',
result = ?,
completed_at = ?,
claim_lock = NULL,
claim_expires= NULL,
worker_pid = NULL
WHERE id = ?
AND status IN ('running', 'ready', 'blocked')
""",
(result, now, task_id),
)
else:
cur = conn.execute(
"""
UPDATE tasks
SET status = 'done',
result = ?,
completed_at = ?,
claim_lock = NULL,
claim_expires= NULL,
worker_pid = NULL
WHERE id = ?
AND status IN ('running', 'ready', 'blocked')
AND current_run_id = ?
""",
(result, now, task_id, int(expected_run_id)),
)
if cur.rowcount != 1:
return False
run_id = _end_run(
@ -2310,21 +2328,37 @@ def block_task(
task_id: str,
*,
reason: Optional[str] = None,
expected_run_id: Optional[int] = None,
) -> bool:
"""Transition ``running -> blocked``."""
with write_txn(conn):
cur = conn.execute(
"""
UPDATE tasks
SET status = 'blocked',
claim_lock = NULL,
claim_expires= NULL,
worker_pid = NULL
WHERE id = ?
AND status IN ('running', 'ready')
""",
(task_id,),
)
if expected_run_id is None:
cur = conn.execute(
"""
UPDATE tasks
SET status = 'blocked',
claim_lock = NULL,
claim_expires= NULL,
worker_pid = NULL
WHERE id = ?
AND status IN ('running', 'ready')
""",
(task_id,),
)
else:
cur = conn.execute(
"""
UPDATE tasks
SET status = 'blocked',
claim_lock = NULL,
claim_expires= NULL,
worker_pid = NULL
WHERE id = ?
AND status IN ('running', 'ready')
AND current_run_id = ?
""",
(task_id, int(expected_run_id)),
)
if cur.rowcount != 1:
return False
run_id = _end_run(
@ -2596,6 +2630,7 @@ def heartbeat_worker(
task_id: str,
*,
note: Optional[str] = None,
expected_run_id: Optional[int] = None,
) -> bool:
"""Record a ``heartbeat`` event + touch ``last_heartbeat_at``.
@ -2609,14 +2644,25 @@ def heartbeat_worker(
"""
now = int(time.time())
with write_txn(conn):
cur = conn.execute(
"UPDATE tasks SET last_heartbeat_at = ? "
"WHERE id = ? AND status = 'running'",
(now, task_id),
)
if expected_run_id is None:
cur = conn.execute(
"UPDATE tasks SET last_heartbeat_at = ? "
"WHERE id = ? AND status = 'running'",
(now, task_id),
)
else:
cur = conn.execute(
"UPDATE tasks SET last_heartbeat_at = ? "
"WHERE id = ? AND status = 'running' AND current_run_id = ?",
(now, task_id, int(expected_run_id)),
)
if cur.rowcount != 1:
return False
run_id = _current_run_id(conn, task_id)
run_id = (
int(expected_run_id)
if expected_run_id is not None
else _current_run_id(conn, task_id)
)
if run_id is not None:
conn.execute(
"UPDATE task_runs SET last_heartbeat_at = ? WHERE id = ?",
@ -3219,6 +3265,10 @@ def _default_spawn(
env["HERMES_TENANT"] = task.tenant
env["HERMES_KANBAN_TASK"] = task.id
env["HERMES_KANBAN_WORKSPACE"] = workspace
if task.current_run_id is not None:
env["HERMES_KANBAN_RUN_ID"] = str(task.current_run_id)
if task.claim_lock:
env["HERMES_KANBAN_CLAIM_LOCK"] = task.claim_lock
# Pin the shared board + workspaces root the dispatcher resolved, so
# that even when the worker activates a profile (`hermes -p <name>`
# rewrites HERMES_HOME), its kanban paths still match the