fix(kanban): measure max runtime from current run

This commit is contained in:
澪 / Mio 2026-05-04 11:08:09 +09:00 committed by Teknium
parent 6d302b340e
commit b28ab4fc3f
3 changed files with 75 additions and 9 deletions

View file

@ -2712,16 +2712,23 @@ def enforce_max_runtime(
host_prefix = f"{_claimer_id().split(':', 1)[0]}:"
rows = conn.execute(
"SELECT id, worker_pid, started_at, max_runtime_seconds, claim_lock "
"FROM tasks "
"WHERE status = 'running' AND max_runtime_seconds IS NOT NULL "
" AND started_at IS NOT NULL AND worker_pid IS NOT NULL"
"SELECT t.id, t.worker_pid, "
" COALESCE(r.started_at, t.started_at) AS active_started_at, "
" t.max_runtime_seconds, t.claim_lock "
"FROM tasks t "
"LEFT JOIN task_runs r ON r.id = t.current_run_id "
"WHERE t.status = 'running' AND t.max_runtime_seconds IS NOT NULL "
" AND COALESCE(r.started_at, t.started_at) IS NOT NULL "
" AND t.worker_pid IS NOT NULL"
).fetchall()
for row in rows:
lock = row["claim_lock"] or ""
if not lock.startswith(host_prefix):
continue
elapsed = now - int(row["started_at"])
# Runtime is per attempt, not lifetime-of-task. ``tasks.started_at``
# intentionally records the first time a task ever started, so retries
# must be measured from the active task_runs row when present.
elapsed = now - int(row["active_started_at"])
if elapsed < int(row["max_runtime_seconds"]):
continue

View file

@ -682,14 +682,21 @@ def test_max_runtime_terminates_overrun_worker(kanban_home):
conn, title="long job", assignee="worker",
max_runtime_seconds=1, # one second cap
)
# Spawn by hand: claim + set pid + set started_at to the past.
# Spawn by hand: claim + set pid + set active run start to the past.
kb.claim_task(conn, tid)
kb._set_worker_pid(conn, tid, os.getpid()) # any live pid works
# Backdate started_at so elapsed > limit.
# Backdate both the task-level first-start timestamp and the active
# run timestamp so elapsed > limit under the per-run runtime model.
old_started = int(time.time()) - 30
with kb.write_txn(conn):
conn.execute(
"UPDATE tasks SET started_at = ? WHERE id = ?",
(int(time.time()) - 30, tid),
(old_started, tid),
)
conn.execute(
"UPDATE task_runs SET started_at = ? "
"WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)",
(old_started, tid),
)
timed_out = kb.enforce_max_runtime(conn, signal_fn=_signal_fn)
@ -769,10 +776,16 @@ def test_enforce_max_runtime_integrates_with_dispatch(kanban_home, monkeypatch):
)
kb.claim_task(conn, tid)
kb._set_worker_pid(conn, tid, os.getpid())
old_started = int(time.time()) - 30
with kb.write_txn(conn):
conn.execute(
"UPDATE tasks SET started_at = ? WHERE id = ?",
(int(time.time()) - 30, tid),
(old_started, tid),
)
conn.execute(
"UPDATE task_runs SET started_at = ? "
"WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)",
(old_started, tid),
)
# Use enforce_max_runtime directly with our signal stub — dispatch_once
# uses the default os.kill, but integration-wise calling

View file

@ -182,6 +182,52 @@ def test_stale_claim_reclaimed(kanban_home):
assert kb.get_task(conn, t).status == "ready"
def test_max_runtime_uses_current_run_start_after_retry(kanban_home):
"""A retry should get a fresh max-runtime window.
``tasks.started_at`` intentionally records the first time the task ever
started. Runtime enforcement must therefore use the active
``task_runs.started_at`` row; otherwise every retry of an old task is
immediately timed out again.
"""
with kb.connect() as conn:
host = kb._claimer_id().split(":", 1)[0]
t = kb.create_task(
conn, title="retry", assignee="a", max_runtime_seconds=10,
)
kb.claim_task(conn, t, claimer=f"{host}:first")
first_run_id = kb.latest_run(conn, t).id
old_started = int(time.time()) - 20
conn.execute(
"UPDATE tasks SET started_at = ?, worker_pid = ? WHERE id = ?",
(old_started, 999999, t),
)
conn.execute(
"UPDATE task_runs SET started_at = ?, worker_pid = ? WHERE id = ?",
(old_started, 999999, first_run_id),
)
timed_out = kb.enforce_max_runtime(conn, signal_fn=lambda _pid, _sig: None)
assert timed_out == [t]
assert kb.get_task(conn, t).status == "ready"
kb.claim_task(conn, t, claimer=f"{host}:retry")
retry_run = kb.latest_run(conn, t)
conn.execute(
"UPDATE tasks SET worker_pid = ? WHERE id = ?",
(999999, t),
)
conn.execute(
"UPDATE task_runs SET worker_pid = ? WHERE id = ?",
(999999, retry_run.id),
)
timed_out = kb.enforce_max_runtime(conn, signal_fn=lambda _pid, _sig: None)
assert timed_out == []
assert kb.get_task(conn, t).status == "running"
def test_heartbeat_extends_claim(kanban_home):
with kb.connect() as conn:
t = kb.create_task(conn, title="x", assignee="a")