mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-29 06:31:32 +00:00
fix(kanban): make claim ttl configurable
Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
parent
86279160b0
commit
0f620138b0
2 changed files with 86 additions and 10 deletions
|
|
@ -96,13 +96,38 @@ VALID_WORKSPACE_KINDS = {"scratch", "worktree", "dir"}
|
|||
KNOWN_TOOLSET_NAMES = frozenset(name.casefold() for name in get_toolset_names())
|
||||
_IS_WINDOWS = sys.platform == "win32"
|
||||
|
||||
# A running task's claim is valid for 15 minutes; after that the next
|
||||
# dispatcher tick reclaims it. Workers that outlive this window should call
|
||||
# ``heartbeat_claim(task_id)`` periodically. In practice most kanban
|
||||
# workloads either finish within 15m or set a longer claim explicitly.
|
||||
# A running task's claim is valid for 15 minutes by default; after that the
|
||||
# next dispatcher tick reclaims it. Workers that outlive this window should
|
||||
# call ``heartbeat_claim(task_id)`` periodically. In practice most kanban
|
||||
# workloads either finish within 15m, set a longer claim explicitly, or use
|
||||
# ``HERMES_KANBAN_CLAIM_TTL_SECONDS`` to raise the default claim window for
|
||||
# long single-call MCP workflows.
|
||||
DEFAULT_CLAIM_TTL_SECONDS = 15 * 60
|
||||
|
||||
|
||||
def _resolve_claim_ttl_seconds(ttl_seconds: Optional[int] = None) -> int:
|
||||
"""Return the effective claim TTL, honoring the kanban env override.
|
||||
|
||||
Explicit call-site values win. Otherwise a positive integer from
|
||||
``HERMES_KANBAN_CLAIM_TTL_SECONDS`` overrides the built-in default.
|
||||
Invalid or non-positive env values fall back silently so existing
|
||||
installs keep working.
|
||||
"""
|
||||
if ttl_seconds is not None:
|
||||
return max(1, int(ttl_seconds))
|
||||
|
||||
raw = os.environ.get("HERMES_KANBAN_CLAIM_TTL_SECONDS", "").strip()
|
||||
if raw:
|
||||
try:
|
||||
parsed = int(raw)
|
||||
except ValueError:
|
||||
parsed = 0
|
||||
if parsed > 0:
|
||||
return parsed
|
||||
|
||||
return DEFAULT_CLAIM_TTL_SECONDS
|
||||
|
||||
|
||||
# Worker-context caps so build_worker_context() stays bounded on
|
||||
# pathological boards (retry-heavy tasks, comment storms, giant
|
||||
# summaries). Values chosen to fit a typical 100k-char LLM prompt with
|
||||
|
|
@ -1898,7 +1923,7 @@ def claim_task(
|
|||
conn: sqlite3.Connection,
|
||||
task_id: str,
|
||||
*,
|
||||
ttl_seconds: int = DEFAULT_CLAIM_TTL_SECONDS,
|
||||
ttl_seconds: Optional[int] = None,
|
||||
claimer: Optional[str] = None,
|
||||
) -> Optional[Task]:
|
||||
"""Atomically transition ``ready -> running``.
|
||||
|
|
@ -1908,7 +1933,7 @@ def claim_task(
|
|||
"""
|
||||
now = int(time.time())
|
||||
lock = claimer or _claimer_id()
|
||||
expires = now + int(ttl_seconds)
|
||||
expires = now + _resolve_claim_ttl_seconds(ttl_seconds)
|
||||
with write_txn(conn):
|
||||
# Structural invariant: never transition ready -> running while any
|
||||
# parent is not yet 'done'. This is the single enforcement point
|
||||
|
|
@ -2012,7 +2037,7 @@ def heartbeat_claim(
|
|||
conn: sqlite3.Connection,
|
||||
task_id: str,
|
||||
*,
|
||||
ttl_seconds: int = DEFAULT_CLAIM_TTL_SECONDS,
|
||||
ttl_seconds: Optional[int] = None,
|
||||
claimer: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""Extend a running claim. Returns True if we still own it.
|
||||
|
|
@ -2020,7 +2045,7 @@ def heartbeat_claim(
|
|||
Workers that know they'll exceed 15 minutes should call this every
|
||||
few minutes to keep ownership.
|
||||
"""
|
||||
expires = int(time.time()) + int(ttl_seconds)
|
||||
expires = int(time.time()) + _resolve_claim_ttl_seconds(ttl_seconds)
|
||||
lock = claimer or _claimer_id()
|
||||
with write_txn(conn):
|
||||
cur = conn.execute(
|
||||
|
|
@ -2073,7 +2098,7 @@ def release_stale_claims(
|
|||
lock = row["claim_lock"] or ""
|
||||
host_local = lock.startswith(host_prefix)
|
||||
if host_local and row["worker_pid"] and _pid_alive(row["worker_pid"]):
|
||||
new_expires = now + int(DEFAULT_CLAIM_TTL_SECONDS)
|
||||
new_expires = now + _resolve_claim_ttl_seconds()
|
||||
with write_txn(conn):
|
||||
cur = conn.execute(
|
||||
"UPDATE tasks SET claim_expires = ? "
|
||||
|
|
@ -3976,7 +4001,7 @@ def dispatch_once(
|
|||
conn: sqlite3.Connection,
|
||||
*,
|
||||
spawn_fn=None,
|
||||
ttl_seconds: int = DEFAULT_CLAIM_TTL_SECONDS,
|
||||
ttl_seconds: Optional[int] = None,
|
||||
dry_run: bool = False,
|
||||
max_spawn: Optional[int] = None,
|
||||
failure_limit: int = DEFAULT_SPAWN_FAILURE_LIMIT,
|
||||
|
|
|
|||
|
|
@ -186,6 +186,16 @@ def test_claim_once_wins_second_loses(kanban_home):
|
|||
assert second is None
|
||||
|
||||
|
||||
def test_claim_uses_env_default_ttl(kanban_home, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_KANBAN_CLAIM_TTL_SECONDS", "3600")
|
||||
with kb.connect() as conn:
|
||||
t = kb.create_task(conn, title="x", assignee="a")
|
||||
kb.claim_task(conn, t, claimer="host:1")
|
||||
expires = kb.get_task(conn, t).claim_expires
|
||||
assert expires is not None
|
||||
assert expires > int(time.time()) + 3000
|
||||
|
||||
|
||||
def test_claim_fails_on_non_ready(kanban_home):
|
||||
with kb.connect() as conn:
|
||||
t = kb.create_task(conn, title="x")
|
||||
|
|
@ -267,6 +277,33 @@ def test_stale_claim_with_live_pid_extends_instead_of_reclaiming(
|
|||
assert "reclaimed" not in kinds
|
||||
|
||||
|
||||
def test_stale_claim_with_live_pid_uses_env_ttl_override(
|
||||
kanban_home, monkeypatch,
|
||||
):
|
||||
import hermes_cli.kanban_db as _kb
|
||||
|
||||
monkeypatch.setenv("HERMES_KANBAN_CLAIM_TTL_SECONDS", "3600")
|
||||
|
||||
with kb.connect() as conn:
|
||||
t = kb.create_task(conn, title="x", assignee="a")
|
||||
host = _kb._claimer_id().split(":", 1)[0]
|
||||
kb.claim_task(conn, t, claimer=f"{host}:worker")
|
||||
kb._set_worker_pid(conn, t, 12345)
|
||||
conn.execute(
|
||||
"UPDATE tasks SET claim_expires = ? WHERE id = ?",
|
||||
(int(time.time()) - 60, t),
|
||||
)
|
||||
|
||||
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: True)
|
||||
reclaimed = kb.release_stale_claims(conn, signal_fn=lambda _p, _s: None)
|
||||
assert reclaimed == 0
|
||||
|
||||
task = kb.get_task(conn, t)
|
||||
assert task is not None
|
||||
assert task.claim_expires is not None
|
||||
assert task.claim_expires > int(time.time()) + 3000
|
||||
|
||||
|
||||
def test_stale_claim_reclaim_event_records_diagnostic_payload(
|
||||
kanban_home, monkeypatch,
|
||||
):
|
||||
|
|
@ -427,6 +464,20 @@ def test_heartbeat_extends_claim(kanban_home):
|
|||
assert new > int(time.time()) + 3000
|
||||
|
||||
|
||||
def test_heartbeat_uses_env_default_ttl(kanban_home, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_KANBAN_CLAIM_TTL_SECONDS", "3600")
|
||||
with kb.connect() as conn:
|
||||
t = kb.create_task(conn, title="x", assignee="a")
|
||||
claimer = "host:hb"
|
||||
kb.claim_task(conn, t, claimer=claimer, ttl_seconds=60)
|
||||
conn.execute("UPDATE tasks SET claim_expires = ? WHERE id = ?", (0, t))
|
||||
ok = kb.heartbeat_claim(conn, t, claimer=claimer)
|
||||
assert ok
|
||||
new = kb.get_task(conn, t).claim_expires
|
||||
assert new is not None
|
||||
assert new > int(time.time()) + 3000
|
||||
|
||||
|
||||
def test_concurrent_claims_only_one_wins(kanban_home):
|
||||
"""Fire N threads claiming the same task; exactly one must win."""
|
||||
with kb.connect() as conn:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue