From 0f620138b0a1852fe464a2aad12824df621dd5c1 Mon Sep 17 00:00:00 2001 From: LeonSGP43 <154585401+LeonSGP43@users.noreply.github.com> Date: Mon, 18 May 2026 20:23:26 -0700 Subject: [PATCH] fix(kanban): make claim ttl configurable Co-Authored-By: Paperclip --- hermes_cli/kanban_db.py | 45 ++++++++++++++++++++------ tests/hermes_cli/test_kanban_db.py | 51 ++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 10 deletions(-) diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index b1f2d625052..2080d6a6fff 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -96,13 +96,38 @@ VALID_WORKSPACE_KINDS = {"scratch", "worktree", "dir"} KNOWN_TOOLSET_NAMES = frozenset(name.casefold() for name in get_toolset_names()) _IS_WINDOWS = sys.platform == "win32" -# A running task's claim is valid for 15 minutes; after that the next -# dispatcher tick reclaims it. Workers that outlive this window should call -# ``heartbeat_claim(task_id)`` periodically. In practice most kanban -# workloads either finish within 15m or set a longer claim explicitly. +# A running task's claim is valid for 15 minutes by default; after that the +# next dispatcher tick reclaims it. Workers that outlive this window should +# call ``heartbeat_claim(task_id)`` periodically. In practice most kanban +# workloads either finish within 15m, set a longer claim explicitly, or use +# ``HERMES_KANBAN_CLAIM_TTL_SECONDS`` to raise the default claim window for +# long single-call MCP workflows. DEFAULT_CLAIM_TTL_SECONDS = 15 * 60 +def _resolve_claim_ttl_seconds(ttl_seconds: Optional[int] = None) -> int: + """Return the effective claim TTL, honoring the kanban env override. + + Explicit call-site values win. Otherwise a positive integer from + ``HERMES_KANBAN_CLAIM_TTL_SECONDS`` overrides the built-in default. + Invalid or non-positive env values fall back silently so existing + installs keep working. + """ + if ttl_seconds is not None: + return max(1, int(ttl_seconds)) + + raw = os.environ.get("HERMES_KANBAN_CLAIM_TTL_SECONDS", "").strip() + if raw: + try: + parsed = int(raw) + except ValueError: + parsed = 0 + if parsed > 0: + return parsed + + return DEFAULT_CLAIM_TTL_SECONDS + + # Worker-context caps so build_worker_context() stays bounded on # pathological boards (retry-heavy tasks, comment storms, giant # summaries). Values chosen to fit a typical 100k-char LLM prompt with @@ -1898,7 +1923,7 @@ def claim_task( conn: sqlite3.Connection, task_id: str, *, - ttl_seconds: int = DEFAULT_CLAIM_TTL_SECONDS, + ttl_seconds: Optional[int] = None, claimer: Optional[str] = None, ) -> Optional[Task]: """Atomically transition ``ready -> running``. @@ -1908,7 +1933,7 @@ def claim_task( """ now = int(time.time()) lock = claimer or _claimer_id() - expires = now + int(ttl_seconds) + expires = now + _resolve_claim_ttl_seconds(ttl_seconds) with write_txn(conn): # Structural invariant: never transition ready -> running while any # parent is not yet 'done'. This is the single enforcement point @@ -2012,7 +2037,7 @@ def heartbeat_claim( conn: sqlite3.Connection, task_id: str, *, - ttl_seconds: int = DEFAULT_CLAIM_TTL_SECONDS, + ttl_seconds: Optional[int] = None, claimer: Optional[str] = None, ) -> bool: """Extend a running claim. Returns True if we still own it. @@ -2020,7 +2045,7 @@ def heartbeat_claim( Workers that know they'll exceed 15 minutes should call this every few minutes to keep ownership. """ - expires = int(time.time()) + int(ttl_seconds) + expires = int(time.time()) + _resolve_claim_ttl_seconds(ttl_seconds) lock = claimer or _claimer_id() with write_txn(conn): cur = conn.execute( @@ -2073,7 +2098,7 @@ def release_stale_claims( lock = row["claim_lock"] or "" host_local = lock.startswith(host_prefix) if host_local and row["worker_pid"] and _pid_alive(row["worker_pid"]): - new_expires = now + int(DEFAULT_CLAIM_TTL_SECONDS) + new_expires = now + _resolve_claim_ttl_seconds() with write_txn(conn): cur = conn.execute( "UPDATE tasks SET claim_expires = ? " @@ -3976,7 +4001,7 @@ def dispatch_once( conn: sqlite3.Connection, *, spawn_fn=None, - ttl_seconds: int = DEFAULT_CLAIM_TTL_SECONDS, + ttl_seconds: Optional[int] = None, dry_run: bool = False, max_spawn: Optional[int] = None, failure_limit: int = DEFAULT_SPAWN_FAILURE_LIMIT, diff --git a/tests/hermes_cli/test_kanban_db.py b/tests/hermes_cli/test_kanban_db.py index f268bc6705f..0332b052b47 100644 --- a/tests/hermes_cli/test_kanban_db.py +++ b/tests/hermes_cli/test_kanban_db.py @@ -186,6 +186,16 @@ def test_claim_once_wins_second_loses(kanban_home): assert second is None +def test_claim_uses_env_default_ttl(kanban_home, monkeypatch): + monkeypatch.setenv("HERMES_KANBAN_CLAIM_TTL_SECONDS", "3600") + with kb.connect() as conn: + t = kb.create_task(conn, title="x", assignee="a") + kb.claim_task(conn, t, claimer="host:1") + expires = kb.get_task(conn, t).claim_expires + assert expires is not None + assert expires > int(time.time()) + 3000 + + def test_claim_fails_on_non_ready(kanban_home): with kb.connect() as conn: t = kb.create_task(conn, title="x") @@ -267,6 +277,33 @@ def test_stale_claim_with_live_pid_extends_instead_of_reclaiming( assert "reclaimed" not in kinds +def test_stale_claim_with_live_pid_uses_env_ttl_override( + kanban_home, monkeypatch, +): + import hermes_cli.kanban_db as _kb + + monkeypatch.setenv("HERMES_KANBAN_CLAIM_TTL_SECONDS", "3600") + + with kb.connect() as conn: + t = kb.create_task(conn, title="x", assignee="a") + host = _kb._claimer_id().split(":", 1)[0] + kb.claim_task(conn, t, claimer=f"{host}:worker") + kb._set_worker_pid(conn, t, 12345) + conn.execute( + "UPDATE tasks SET claim_expires = ? WHERE id = ?", + (int(time.time()) - 60, t), + ) + + monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: True) + reclaimed = kb.release_stale_claims(conn, signal_fn=lambda _p, _s: None) + assert reclaimed == 0 + + task = kb.get_task(conn, t) + assert task is not None + assert task.claim_expires is not None + assert task.claim_expires > int(time.time()) + 3000 + + def test_stale_claim_reclaim_event_records_diagnostic_payload( kanban_home, monkeypatch, ): @@ -427,6 +464,20 @@ def test_heartbeat_extends_claim(kanban_home): assert new > int(time.time()) + 3000 +def test_heartbeat_uses_env_default_ttl(kanban_home, monkeypatch): + monkeypatch.setenv("HERMES_KANBAN_CLAIM_TTL_SECONDS", "3600") + with kb.connect() as conn: + t = kb.create_task(conn, title="x", assignee="a") + claimer = "host:hb" + kb.claim_task(conn, t, claimer=claimer, ttl_seconds=60) + conn.execute("UPDATE tasks SET claim_expires = ? WHERE id = ?", (0, t)) + ok = kb.heartbeat_claim(conn, t, claimer=claimer) + assert ok + new = kb.get_task(conn, t).claim_expires + assert new is not None + assert new > int(time.time()) + 3000 + + def test_concurrent_claims_only_one_wins(kanban_home): """Fire N threads claiming the same task; exactly one must win.""" with kb.connect() as conn: