From b28ab4fc3fab1725be11c86c44ec0b09c32557e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BE=AA=20/=20Mio?= Date: Mon, 4 May 2026 11:08:09 +0900 Subject: [PATCH] fix(kanban): measure max runtime from current run --- hermes_cli/kanban_db.py | 17 +++++-- .../test_kanban_core_functionality.py | 21 +++++++-- tests/hermes_cli/test_kanban_db.py | 46 +++++++++++++++++++ 3 files changed, 75 insertions(+), 9 deletions(-) diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index 7a02cdf702..3c6c7a1b92 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -2712,16 +2712,23 @@ def enforce_max_runtime( host_prefix = f"{_claimer_id().split(':', 1)[0]}:" rows = conn.execute( - "SELECT id, worker_pid, started_at, max_runtime_seconds, claim_lock " - "FROM tasks " - "WHERE status = 'running' AND max_runtime_seconds IS NOT NULL " - " AND started_at IS NOT NULL AND worker_pid IS NOT NULL" + "SELECT t.id, t.worker_pid, " + " COALESCE(r.started_at, t.started_at) AS active_started_at, " + " t.max_runtime_seconds, t.claim_lock " + "FROM tasks t " + "LEFT JOIN task_runs r ON r.id = t.current_run_id " + "WHERE t.status = 'running' AND t.max_runtime_seconds IS NOT NULL " + " AND COALESCE(r.started_at, t.started_at) IS NOT NULL " + " AND t.worker_pid IS NOT NULL" ).fetchall() for row in rows: lock = row["claim_lock"] or "" if not lock.startswith(host_prefix): continue - elapsed = now - int(row["started_at"]) + # Runtime is per attempt, not lifetime-of-task. ``tasks.started_at`` + # intentionally records the first time a task ever started, so retries + # must be measured from the active task_runs row when present. + elapsed = now - int(row["active_started_at"]) if elapsed < int(row["max_runtime_seconds"]): continue diff --git a/tests/hermes_cli/test_kanban_core_functionality.py b/tests/hermes_cli/test_kanban_core_functionality.py index a9db7489e3..1bf0ad4c7c 100644 --- a/tests/hermes_cli/test_kanban_core_functionality.py +++ b/tests/hermes_cli/test_kanban_core_functionality.py @@ -682,14 +682,21 @@ def test_max_runtime_terminates_overrun_worker(kanban_home): conn, title="long job", assignee="worker", max_runtime_seconds=1, # one second cap ) - # Spawn by hand: claim + set pid + set started_at to the past. + # Spawn by hand: claim + set pid + set active run start to the past. kb.claim_task(conn, tid) kb._set_worker_pid(conn, tid, os.getpid()) # any live pid works - # Backdate started_at so elapsed > limit. + # Backdate both the task-level first-start timestamp and the active + # run timestamp so elapsed > limit under the per-run runtime model. + old_started = int(time.time()) - 30 with kb.write_txn(conn): conn.execute( "UPDATE tasks SET started_at = ? WHERE id = ?", - (int(time.time()) - 30, tid), + (old_started, tid), + ) + conn.execute( + "UPDATE task_runs SET started_at = ? " + "WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)", + (old_started, tid), ) timed_out = kb.enforce_max_runtime(conn, signal_fn=_signal_fn) @@ -769,10 +776,16 @@ def test_enforce_max_runtime_integrates_with_dispatch(kanban_home, monkeypatch): ) kb.claim_task(conn, tid) kb._set_worker_pid(conn, tid, os.getpid()) + old_started = int(time.time()) - 30 with kb.write_txn(conn): conn.execute( "UPDATE tasks SET started_at = ? WHERE id = ?", - (int(time.time()) - 30, tid), + (old_started, tid), + ) + conn.execute( + "UPDATE task_runs SET started_at = ? " + "WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)", + (old_started, tid), ) # Use enforce_max_runtime directly with our signal stub — dispatch_once # uses the default os.kill, but integration-wise calling diff --git a/tests/hermes_cli/test_kanban_db.py b/tests/hermes_cli/test_kanban_db.py index e6d25a3d84..365aa83113 100644 --- a/tests/hermes_cli/test_kanban_db.py +++ b/tests/hermes_cli/test_kanban_db.py @@ -182,6 +182,52 @@ def test_stale_claim_reclaimed(kanban_home): assert kb.get_task(conn, t).status == "ready" +def test_max_runtime_uses_current_run_start_after_retry(kanban_home): + """A retry should get a fresh max-runtime window. + + ``tasks.started_at`` intentionally records the first time the task ever + started. Runtime enforcement must therefore use the active + ``task_runs.started_at`` row; otherwise every retry of an old task is + immediately timed out again. + """ + with kb.connect() as conn: + host = kb._claimer_id().split(":", 1)[0] + t = kb.create_task( + conn, title="retry", assignee="a", max_runtime_seconds=10, + ) + + kb.claim_task(conn, t, claimer=f"{host}:first") + first_run_id = kb.latest_run(conn, t).id + old_started = int(time.time()) - 20 + conn.execute( + "UPDATE tasks SET started_at = ?, worker_pid = ? WHERE id = ?", + (old_started, 999999, t), + ) + conn.execute( + "UPDATE task_runs SET started_at = ?, worker_pid = ? WHERE id = ?", + (old_started, 999999, first_run_id), + ) + + timed_out = kb.enforce_max_runtime(conn, signal_fn=lambda _pid, _sig: None) + assert timed_out == [t] + assert kb.get_task(conn, t).status == "ready" + + kb.claim_task(conn, t, claimer=f"{host}:retry") + retry_run = kb.latest_run(conn, t) + conn.execute( + "UPDATE tasks SET worker_pid = ? WHERE id = ?", + (999999, t), + ) + conn.execute( + "UPDATE task_runs SET worker_pid = ? WHERE id = ?", + (999999, retry_run.id), + ) + + timed_out = kb.enforce_max_runtime(conn, signal_fn=lambda _pid, _sig: None) + assert timed_out == [] + assert kb.get_task(conn, t).status == "running" + + def test_heartbeat_extends_claim(kanban_home): with kb.connect() as conn: t = kb.create_task(conn, title="x", assignee="a")