mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-29 06:31:32 +00:00
3084 lines
119 KiB
Python
3084 lines
119 KiB
Python
"""Tests for the Kanban DB layer (hermes_cli.kanban_db)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import concurrent.futures
|
|
import os
|
|
import sqlite3
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from hermes_cli import kanban_db as kb
|
|
|
|
|
|
@pytest.fixture
|
|
def kanban_home(tmp_path, monkeypatch):
|
|
"""Isolated HERMES_HOME with an empty kanban DB."""
|
|
home = tmp_path / ".hermes"
|
|
home.mkdir()
|
|
monkeypatch.setenv("HERMES_HOME", str(home))
|
|
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
|
kb.init_db()
|
|
return home
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Schema / init
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_init_db_is_idempotent(kanban_home):
|
|
# Second call should not error or drop data.
|
|
with kb.connect() as conn:
|
|
kb.create_task(conn, title="persisted")
|
|
kb.init_db()
|
|
with kb.connect() as conn:
|
|
tasks = kb.list_tasks(conn)
|
|
assert len(tasks) == 1
|
|
assert tasks[0].title == "persisted"
|
|
|
|
|
|
def test_init_creates_expected_tables(kanban_home):
|
|
with kb.connect() as conn:
|
|
rows = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
|
|
).fetchall()
|
|
names = {r["name"] for r in rows}
|
|
assert {"tasks", "task_links", "task_comments", "task_events"} <= names
|
|
|
|
|
|
def test_connect_rejects_tls_record_in_sqlite_header(tmp_path, monkeypatch):
|
|
"""Kanban should classify TLS-looking page-0 clobbers before WAL setup."""
|
|
home = tmp_path / ".hermes"
|
|
home.mkdir()
|
|
monkeypatch.setenv("HERMES_HOME", str(home))
|
|
monkeypatch.delenv("HERMES_KANBAN_DB", raising=False)
|
|
monkeypatch.delenv("HERMES_KANBAN_HOME", raising=False)
|
|
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
|
|
|
corrupt = home / "kanban.db"
|
|
corrupt.write_bytes(b"SQLit" + bytes.fromhex("17 03 03 00 13") + b"x" * 32)
|
|
|
|
with pytest.raises(sqlite3.DatabaseError) as exc_info:
|
|
kb.connect(board="default")
|
|
|
|
msg = str(exc_info.value)
|
|
assert "file is not a database" in msg
|
|
assert "TLS record header detected at byte offset 5" in msg
|
|
assert "53 51 4c 69 74 17 03 03 00 13" in msg
|
|
|
|
|
|
def test_connect_migrates_legacy_db_before_optional_column_indexes(tmp_path):
|
|
"""Legacy DBs missing additive indexed columns must migrate cleanly.
|
|
|
|
SCHEMA_SQL runs in ``connect()`` before ``_migrate_add_optional_columns``.
|
|
Indexes over additive columns therefore must be created after the
|
|
migration adds those columns, or boards predating the column fail to
|
|
open before migration can run.
|
|
|
|
Covers all four indexes that sit on additive columns:
|
|
- ``tasks.session_id`` -> ``idx_tasks_session_id`` (#28447)
|
|
- ``tasks.tenant`` -> ``idx_tasks_tenant`` (#16081)
|
|
- ``tasks.idempotency_key`` -> ``idx_tasks_idempotency`` (#17805)
|
|
- ``task_events.run_id`` -> ``idx_events_run`` (#17805)
|
|
"""
|
|
db_path = tmp_path / "legacy-kanban.db"
|
|
conn = sqlite3.connect(str(db_path))
|
|
# Pre-#16081 ``tasks`` shape: missing tenant, idempotency_key, session_id.
|
|
conn.execute("""
|
|
CREATE TABLE tasks (
|
|
id TEXT PRIMARY KEY,
|
|
title TEXT NOT NULL,
|
|
body TEXT,
|
|
assignee TEXT,
|
|
status TEXT NOT NULL,
|
|
priority INTEGER NOT NULL DEFAULT 0,
|
|
created_by TEXT,
|
|
created_at INTEGER NOT NULL,
|
|
started_at INTEGER,
|
|
completed_at INTEGER,
|
|
workspace_kind TEXT NOT NULL DEFAULT 'scratch',
|
|
workspace_path TEXT,
|
|
claim_lock TEXT,
|
|
claim_expires INTEGER
|
|
)
|
|
""")
|
|
# Pre-#17805 ``task_events`` shape: missing run_id. Required because
|
|
# ``_migrate_add_optional_columns`` unconditionally runs PRAGMA on
|
|
# ``task_events`` for run_id back-fill.
|
|
conn.execute("""
|
|
CREATE TABLE task_events (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
task_id TEXT NOT NULL,
|
|
kind TEXT NOT NULL,
|
|
payload TEXT,
|
|
created_at INTEGER NOT NULL
|
|
)
|
|
""")
|
|
conn.execute(
|
|
"INSERT INTO tasks (id, title, status, created_at) "
|
|
"VALUES ('legacy', 'old board task', 'ready', 1)"
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
with kb.connect(db_path) as migrated:
|
|
task_columns = {
|
|
row["name"] for row in migrated.execute("PRAGMA table_info(tasks)")
|
|
}
|
|
event_columns = {
|
|
row["name"]
|
|
for row in migrated.execute("PRAGMA table_info(task_events)")
|
|
}
|
|
indexes = {
|
|
row["name"]
|
|
for row in migrated.execute(
|
|
"SELECT name FROM sqlite_master WHERE type = 'index'"
|
|
)
|
|
}
|
|
|
|
# Additive columns added by migration:
|
|
assert "session_id" in task_columns
|
|
assert "tenant" in task_columns
|
|
assert "idempotency_key" in task_columns
|
|
assert "run_id" in event_columns
|
|
# And their indexes — the regression scope of this test:
|
|
assert "idx_tasks_session_id" in indexes
|
|
assert "idx_tasks_tenant" in indexes
|
|
assert "idx_tasks_idempotency" in indexes
|
|
assert "idx_events_run" in indexes
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Task creation + status inference
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_create_task_no_parents_is_ready(kanban_home):
|
|
with kb.connect() as conn:
|
|
tid = kb.create_task(conn, title="ship it", assignee="alice")
|
|
t = kb.get_task(conn, tid)
|
|
assert t is not None
|
|
assert t.status == "ready"
|
|
assert t.assignee == "alice"
|
|
assert t.workspace_kind == "scratch"
|
|
|
|
|
|
def test_create_task_with_parent_is_todo_until_parent_done(kanban_home):
|
|
with kb.connect() as conn:
|
|
p = kb.create_task(conn, title="parent")
|
|
c = kb.create_task(conn, title="child", parents=[p])
|
|
assert kb.get_task(conn, c).status == "todo"
|
|
kb.complete_task(conn, p, result="ok")
|
|
assert kb.get_task(conn, c).status == "ready"
|
|
|
|
|
|
def test_create_task_unknown_parent_errors(kanban_home):
|
|
with kb.connect() as conn, pytest.raises(ValueError, match="unknown parent"):
|
|
kb.create_task(conn, title="orphan", parents=["t_ghost"])
|
|
|
|
|
|
def test_workspace_kind_validation(kanban_home):
|
|
with kb.connect() as conn, pytest.raises(ValueError, match="workspace_kind"):
|
|
kb.create_task(conn, title="bad ws", workspace_kind="cloud")
|
|
|
|
|
|
def test_create_task_persists_worktree_branch_name(kanban_home, tmp_path):
|
|
target = tmp_path / ".worktrees" / "t6-wire"
|
|
with kb.connect() as conn:
|
|
tid = kb.create_task(
|
|
conn,
|
|
title="ship worktree",
|
|
workspace_kind="worktree",
|
|
workspace_path=str(target),
|
|
branch_name=" wt/t6-wire ",
|
|
)
|
|
task = kb.get_task(conn, tid)
|
|
events = kb.list_events(conn, tid)
|
|
context = kb.build_worker_context(conn, tid)
|
|
|
|
assert task.branch_name == "wt/t6-wire"
|
|
assert events[0].payload["branch_name"] == "wt/t6-wire"
|
|
assert "Branch: wt/t6-wire" in context
|
|
|
|
|
|
def test_branch_name_requires_worktree_workspace(kanban_home):
|
|
with kb.connect() as conn, pytest.raises(ValueError, match="worktree"):
|
|
kb.create_task(
|
|
conn,
|
|
title="bad branch",
|
|
workspace_kind="scratch",
|
|
branch_name="wt/bad",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Links + dependency resolution
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_link_demotes_ready_child_to_todo_when_parent_not_done(kanban_home):
|
|
with kb.connect() as conn:
|
|
a = kb.create_task(conn, title="a")
|
|
b = kb.create_task(conn, title="b")
|
|
assert kb.get_task(conn, b).status == "ready"
|
|
kb.link_tasks(conn, a, b)
|
|
assert kb.get_task(conn, b).status == "todo"
|
|
|
|
|
|
def test_link_keeps_ready_child_when_parent_already_done(kanban_home):
|
|
with kb.connect() as conn:
|
|
a = kb.create_task(conn, title="a")
|
|
kb.complete_task(conn, a)
|
|
b = kb.create_task(conn, title="b")
|
|
assert kb.get_task(conn, b).status == "ready"
|
|
kb.link_tasks(conn, a, b)
|
|
assert kb.get_task(conn, b).status == "ready"
|
|
|
|
|
|
def test_link_rejects_self_loop(kanban_home):
|
|
with kb.connect() as conn:
|
|
a = kb.create_task(conn, title="a")
|
|
with pytest.raises(ValueError, match="itself"):
|
|
kb.link_tasks(conn, a, a)
|
|
|
|
|
|
def test_link_detects_cycle(kanban_home):
|
|
with kb.connect() as conn:
|
|
a = kb.create_task(conn, title="a")
|
|
b = kb.create_task(conn, title="b", parents=[a])
|
|
c = kb.create_task(conn, title="c", parents=[b])
|
|
with pytest.raises(ValueError, match="cycle"):
|
|
kb.link_tasks(conn, c, a)
|
|
with pytest.raises(ValueError, match="cycle"):
|
|
kb.link_tasks(conn, b, a)
|
|
|
|
|
|
def test_recompute_ready_cascades_through_chain(kanban_home):
|
|
with kb.connect() as conn:
|
|
a = kb.create_task(conn, title="a")
|
|
b = kb.create_task(conn, title="b", parents=[a])
|
|
c = kb.create_task(conn, title="c", parents=[b])
|
|
assert [kb.get_task(conn, x).status for x in (a, b, c)] == \
|
|
["ready", "todo", "todo"]
|
|
kb.complete_task(conn, a)
|
|
assert kb.get_task(conn, b).status == "ready"
|
|
kb.complete_task(conn, b)
|
|
assert kb.get_task(conn, c).status == "ready"
|
|
|
|
|
|
def test_recompute_ready_promotes_blocked_with_done_parents(kanban_home):
|
|
"""blocked tasks with all parents done should be promoted to ready."""
|
|
with kb.connect() as conn:
|
|
parent = kb.create_task(conn, title="parent", assignee="a")
|
|
child = kb.create_task(
|
|
conn, title="child", assignee="a", parents=[parent],
|
|
)
|
|
# Complete the parent
|
|
kb.claim_task(conn, parent)
|
|
kb.complete_task(conn, parent, result="ok")
|
|
# Manually block the child (simulates a worker that failed
|
|
# after the parent finished)
|
|
conn.execute(
|
|
"UPDATE tasks SET status='blocked', consecutive_failures=5, "
|
|
"last_failure_error='persistent error' WHERE id=?",
|
|
(child,),
|
|
)
|
|
conn.commit()
|
|
assert kb.get_task(conn, child).status == "blocked"
|
|
# recompute_ready should promote blocked → ready and reset failures
|
|
promoted = kb.recompute_ready(conn)
|
|
assert promoted == 1
|
|
task = kb.get_task(conn, child)
|
|
assert task.status == "ready"
|
|
assert task.consecutive_failures == 0
|
|
assert task.last_failure_error is None
|
|
|
|
|
|
def test_recompute_ready_fan_in_waits_for_all_parents(kanban_home):
|
|
with kb.connect() as conn:
|
|
a = kb.create_task(conn, title="a")
|
|
b = kb.create_task(conn, title="b")
|
|
c = kb.create_task(conn, title="c", parents=[a, b])
|
|
kb.complete_task(conn, a)
|
|
assert kb.get_task(conn, c).status == "todo"
|
|
kb.complete_task(conn, b)
|
|
assert kb.get_task(conn, c).status == "ready"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Atomic claim (CAS)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_claim_once_wins_second_loses(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x", assignee="a")
|
|
first = kb.claim_task(conn, t, claimer="host:1")
|
|
assert first is not None and first.status == "running"
|
|
second = kb.claim_task(conn, t, claimer="host:2")
|
|
assert second is None
|
|
|
|
|
|
def test_claim_uses_env_default_ttl(kanban_home, monkeypatch):
|
|
monkeypatch.setenv("HERMES_KANBAN_CLAIM_TTL_SECONDS", "3600")
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x", assignee="a")
|
|
kb.claim_task(conn, t, claimer="host:1")
|
|
expires = kb.get_task(conn, t).claim_expires
|
|
assert expires is not None
|
|
assert expires > int(time.time()) + 3000
|
|
|
|
|
|
def test_claim_fails_on_non_ready(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x")
|
|
# Move to todo by introducing an unsatisfied parent.
|
|
p = kb.create_task(conn, title="p")
|
|
kb.link_tasks(conn, p, t)
|
|
assert kb.get_task(conn, t).status == "todo"
|
|
assert kb.claim_task(conn, t) is None
|
|
|
|
|
|
def test_schedule_task_parks_time_delay_without_dispatching(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="delayed recheck", assignee="ops")
|
|
assert kb.schedule_task(conn, t, reason="run next week") is True
|
|
task = kb.get_task(conn, t)
|
|
assert task.status == "scheduled"
|
|
assert kb.claim_task(conn, t) is None
|
|
|
|
events = kb.list_events(conn, t)
|
|
assert any(e.kind == "scheduled" and e.payload == {"reason": "run next week"} for e in events)
|
|
|
|
|
|
def test_unblock_scheduled_rechecks_parent_gate(kanban_home):
|
|
with kb.connect() as conn:
|
|
parent = kb.create_task(conn, title="parent")
|
|
child = kb.create_task(conn, title="child", parents=[parent])
|
|
assert kb.get_task(conn, child).status == "todo"
|
|
assert kb.schedule_task(conn, child, reason="wait until tomorrow") is True
|
|
|
|
assert kb.unblock_task(conn, child) is True
|
|
assert kb.get_task(conn, child).status == "todo"
|
|
|
|
kb.complete_task(conn, parent)
|
|
assert kb.schedule_task(conn, child, reason="second timer") is True
|
|
assert kb.unblock_task(conn, child) is True
|
|
assert kb.get_task(conn, child).status == "ready"
|
|
|
|
|
|
def test_stale_claim_reclaimed(kanban_home, monkeypatch):
|
|
import signal
|
|
import hermes_cli.kanban_db as _kb
|
|
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x", assignee="a")
|
|
host = _kb._claimer_id().split(":", 1)[0]
|
|
kb.claim_task(conn, t, claimer=f"{host}:worker")
|
|
killed: list[int] = []
|
|
|
|
def _signal(_pid, sig):
|
|
killed.append(sig)
|
|
|
|
kb._set_worker_pid(conn, t, 12345)
|
|
# Rewind claim_expires so it looks stale.
|
|
conn.execute(
|
|
"UPDATE tasks SET claim_expires = ? WHERE id = ?",
|
|
(int(time.time()) - 3600, t),
|
|
)
|
|
# Worker PID has died — exactly the case ``release_stale_claims``
|
|
# should still reclaim (post-#23025: live PIDs are now extended).
|
|
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
|
|
reclaimed = kb.release_stale_claims(conn, signal_fn=_signal)
|
|
assert reclaimed == 1
|
|
assert kb.get_task(conn, t).status == "ready"
|
|
assert killed == [signal.SIGTERM]
|
|
|
|
|
|
def test_stale_claim_with_live_pid_extends_instead_of_reclaiming(
|
|
kanban_home, monkeypatch,
|
|
):
|
|
"""A stale-by-TTL claim whose worker PID is still alive should be
|
|
extended, not reclaimed (#23025). Slow models can spend longer than
|
|
``DEFAULT_CLAIM_TTL_SECONDS`` inside a single tool-free LLM call;
|
|
killing those healthy workers produces a respawn loop with zero
|
|
progress."""
|
|
import hermes_cli.kanban_db as _kb
|
|
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x", assignee="a")
|
|
host = _kb._claimer_id().split(":", 1)[0]
|
|
kb.claim_task(conn, t, claimer=f"{host}:worker")
|
|
kb._set_worker_pid(conn, t, 12345)
|
|
|
|
old_expires = int(time.time()) - 60
|
|
conn.execute(
|
|
"UPDATE tasks SET claim_expires = ? WHERE id = ?",
|
|
(old_expires, t),
|
|
)
|
|
|
|
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: True)
|
|
killed: list[int] = []
|
|
reclaimed = kb.release_stale_claims(
|
|
conn, signal_fn=lambda _p, sig: killed.append(sig),
|
|
)
|
|
assert reclaimed == 0
|
|
task = kb.get_task(conn, t)
|
|
assert task.status == "running"
|
|
assert task.claim_expires is not None
|
|
assert task.claim_expires > old_expires
|
|
assert killed == [] # live worker not killed
|
|
|
|
kinds = [
|
|
r["kind"] for r in conn.execute(
|
|
"SELECT kind FROM task_events WHERE task_id = ?", (t,),
|
|
).fetchall()
|
|
]
|
|
assert "claim_extended" in kinds
|
|
assert "reclaimed" not in kinds
|
|
|
|
|
|
def test_stale_claim_with_live_pid_uses_env_ttl_override(
|
|
kanban_home, monkeypatch,
|
|
):
|
|
import hermes_cli.kanban_db as _kb
|
|
|
|
monkeypatch.setenv("HERMES_KANBAN_CLAIM_TTL_SECONDS", "3600")
|
|
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x", assignee="a")
|
|
host = _kb._claimer_id().split(":", 1)[0]
|
|
kb.claim_task(conn, t, claimer=f"{host}:worker")
|
|
kb._set_worker_pid(conn, t, 12345)
|
|
conn.execute(
|
|
"UPDATE tasks SET claim_expires = ? WHERE id = ?",
|
|
(int(time.time()) - 60, t),
|
|
)
|
|
|
|
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: True)
|
|
reclaimed = kb.release_stale_claims(conn, signal_fn=lambda _p, _s: None)
|
|
assert reclaimed == 0
|
|
|
|
task = kb.get_task(conn, t)
|
|
assert task is not None
|
|
assert task.claim_expires is not None
|
|
assert task.claim_expires > int(time.time()) + 3000
|
|
|
|
|
|
def test_stale_claim_reclaim_event_records_diagnostic_payload(
|
|
kanban_home, monkeypatch,
|
|
):
|
|
"""``reclaimed`` events should carry claim_expires, last_heartbeat_at,
|
|
and worker_pid so operators can diagnose why a claim went stale
|
|
(#23025: previous payload only had ``stale_lock`` which gives no
|
|
timing context)."""
|
|
import json
|
|
import hermes_cli.kanban_db as _kb
|
|
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x", assignee="a")
|
|
host = _kb._claimer_id().split(":", 1)[0]
|
|
kb.claim_task(conn, t, claimer=f"{host}:worker")
|
|
kb._set_worker_pid(conn, t, 12345)
|
|
old_expires = int(time.time()) - 3600
|
|
hb_at = int(time.time()) - 1800
|
|
conn.execute(
|
|
"UPDATE tasks SET claim_expires = ?, last_heartbeat_at = ? "
|
|
"WHERE id = ?",
|
|
(old_expires, hb_at, t),
|
|
)
|
|
|
|
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
|
|
kb.release_stale_claims(conn, signal_fn=lambda _p, _s: None)
|
|
row = conn.execute(
|
|
"SELECT payload FROM task_events "
|
|
"WHERE task_id = ? AND kind = 'reclaimed'",
|
|
(t,),
|
|
).fetchone()
|
|
assert row is not None
|
|
payload = json.loads(row["payload"])
|
|
assert payload["claim_expires"] == old_expires
|
|
assert payload["last_heartbeat_at"] == hb_at
|
|
assert payload["worker_pid"] == 12345
|
|
assert payload["host_local"] is True
|
|
|
|
|
|
def test_detect_crashed_workers_systemic_failure_fast_block(
|
|
kanban_home, monkeypatch,
|
|
):
|
|
"""When many tasks crash with the same error, trip the breaker faster."""
|
|
import hermes_cli.kanban_db as _kb
|
|
|
|
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
|
|
|
|
with kb.connect() as conn:
|
|
task_ids = []
|
|
for i in range(4):
|
|
tid = kb.create_task(conn, title=f"task-{i}", assignee="a")
|
|
host = _kb._claimer_id().split(":", 1)[0]
|
|
conn.execute(
|
|
"UPDATE tasks SET status='running', worker_pid=?, "
|
|
"claim_lock=? WHERE id=?",
|
|
(90000 + i, f"{host}:w{i}", tid),
|
|
)
|
|
task_ids.append(tid)
|
|
conn.commit()
|
|
|
|
crashed = kb.detect_crashed_workers(conn)
|
|
assert len(crashed) == 4
|
|
|
|
for tid in task_ids:
|
|
task = kb.get_task(conn, tid)
|
|
assert task.status == "blocked", (
|
|
f"task {tid} should be blocked (systemic), got {task.status}"
|
|
)
|
|
|
|
|
|
def test_detect_crashed_workers_isolated_failure_normal_retry(
|
|
kanban_home, monkeypatch,
|
|
):
|
|
"""Below the systemic threshold, tasks retain normal retry budget."""
|
|
import hermes_cli.kanban_db as _kb
|
|
|
|
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
|
|
|
|
with kb.connect() as conn:
|
|
task_ids = []
|
|
for i in range(2):
|
|
tid = kb.create_task(conn, title=f"iso-{i}", assignee="a")
|
|
host = _kb._claimer_id().split(":", 1)[0]
|
|
conn.execute(
|
|
"UPDATE tasks SET status='running', worker_pid=?, "
|
|
"claim_lock=? WHERE id=?",
|
|
(80000 + i, f"{host}:w{i}", tid),
|
|
)
|
|
task_ids.append(tid)
|
|
conn.commit()
|
|
|
|
crashed = kb.detect_crashed_workers(conn)
|
|
assert len(crashed) == 2
|
|
|
|
for tid in task_ids:
|
|
task = kb.get_task(conn, tid)
|
|
assert task.status == "ready", (
|
|
f"task {tid} should stay ready (isolated), got {task.status}"
|
|
)
|
|
|
|
|
|
def test_max_runtime_uses_current_run_start_after_retry(kanban_home, monkeypatch):
|
|
"""A retry should get a fresh max-runtime window.
|
|
|
|
``tasks.started_at`` intentionally records the first time the task ever
|
|
started. Runtime enforcement must therefore use the active
|
|
``task_runs.started_at`` row; otherwise every retry of an old task is
|
|
immediately timed out again.
|
|
"""
|
|
monkeypatch.setattr(kb, "_pid_alive", lambda _pid: False)
|
|
|
|
with kb.connect() as conn:
|
|
host = kb._claimer_id().split(":", 1)[0]
|
|
t = kb.create_task(
|
|
conn, title="retry", assignee="a", max_runtime_seconds=10,
|
|
)
|
|
|
|
kb.claim_task(conn, t, claimer=f"{host}:first")
|
|
first_run_id = kb.latest_run(conn, t).id
|
|
old_started = int(time.time()) - 20
|
|
conn.execute(
|
|
"UPDATE tasks SET started_at = ?, worker_pid = ? WHERE id = ?",
|
|
(old_started, 999999, t),
|
|
)
|
|
conn.execute(
|
|
"UPDATE task_runs SET started_at = ?, worker_pid = ? WHERE id = ?",
|
|
(old_started, 999999, first_run_id),
|
|
)
|
|
|
|
timed_out = kb.enforce_max_runtime(conn, signal_fn=lambda _pid, _sig: None)
|
|
assert timed_out == [t]
|
|
assert kb.get_task(conn, t).status == "ready"
|
|
|
|
kb.claim_task(conn, t, claimer=f"{host}:retry")
|
|
retry_run = kb.latest_run(conn, t)
|
|
conn.execute(
|
|
"UPDATE tasks SET worker_pid = ? WHERE id = ?",
|
|
(999999, t),
|
|
)
|
|
conn.execute(
|
|
"UPDATE task_runs SET worker_pid = ? WHERE id = ?",
|
|
(999999, retry_run.id),
|
|
)
|
|
|
|
timed_out = kb.enforce_max_runtime(conn, signal_fn=lambda _pid, _sig: None)
|
|
assert timed_out == []
|
|
assert kb.get_task(conn, t).status == "running"
|
|
|
|
|
|
def test_heartbeat_extends_claim(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x", assignee="a")
|
|
claimer = "host:hb"
|
|
kb.claim_task(conn, t, claimer=claimer, ttl_seconds=60)
|
|
original = kb.get_task(conn, t).claim_expires
|
|
# Rewind then heartbeat.
|
|
conn.execute("UPDATE tasks SET claim_expires = ? WHERE id = ?", (0, t))
|
|
ok = kb.heartbeat_claim(conn, t, claimer=claimer, ttl_seconds=3600)
|
|
assert ok
|
|
new = kb.get_task(conn, t).claim_expires
|
|
assert new > int(time.time()) + 3000
|
|
|
|
|
|
def test_heartbeat_uses_env_default_ttl(kanban_home, monkeypatch):
|
|
monkeypatch.setenv("HERMES_KANBAN_CLAIM_TTL_SECONDS", "3600")
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x", assignee="a")
|
|
claimer = "host:hb"
|
|
kb.claim_task(conn, t, claimer=claimer, ttl_seconds=60)
|
|
conn.execute("UPDATE tasks SET claim_expires = ? WHERE id = ?", (0, t))
|
|
ok = kb.heartbeat_claim(conn, t, claimer=claimer)
|
|
assert ok
|
|
new = kb.get_task(conn, t).claim_expires
|
|
assert new is not None
|
|
assert new > int(time.time()) + 3000
|
|
|
|
|
|
def test_concurrent_claims_only_one_wins(kanban_home):
|
|
"""Fire N threads claiming the same task; exactly one must win."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="race", assignee="a")
|
|
|
|
def attempt(i):
|
|
with kb.connect() as c:
|
|
return kb.claim_task(c, t, claimer=f"host:{i}")
|
|
|
|
n_workers = 8
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=n_workers) as ex:
|
|
results = list(ex.map(attempt, range(n_workers)))
|
|
winners = [r for r in results if r is not None]
|
|
assert len(winners) == 1
|
|
assert winners[0].status == "running"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Complete / block / unblock / archive / assign
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_complete_records_result(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x")
|
|
assert kb.complete_task(conn, t, result="done and dusted")
|
|
task = kb.get_task(conn, t)
|
|
assert task.status == "done"
|
|
assert task.result == "done and dusted"
|
|
assert task.completed_at is not None
|
|
|
|
|
|
def test_block_then_unblock(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x", assignee="a")
|
|
kb.claim_task(conn, t)
|
|
assert kb.block_task(conn, t, reason="need input")
|
|
assert kb.get_task(conn, t).status == "blocked"
|
|
assert kb.unblock_task(conn, t)
|
|
assert kb.get_task(conn, t).status == "ready"
|
|
|
|
|
|
def test_unblock_resets_failure_counters(kanban_home):
|
|
"""unblock_task must reset consecutive_failures and last_failure_error."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x", assignee="a")
|
|
kb.claim_task(conn, t)
|
|
assert kb.block_task(conn, t, reason="need input")
|
|
# Simulate accumulated failures from the circuit breaker
|
|
conn.execute(
|
|
"UPDATE tasks SET consecutive_failures = 5, "
|
|
"last_failure_error = 'test error' WHERE id = ?",
|
|
(t,),
|
|
)
|
|
conn.commit()
|
|
assert kb.unblock_task(conn, t)
|
|
task = kb.get_task(conn, t)
|
|
assert task.status == "ready"
|
|
assert task.consecutive_failures == 0
|
|
assert task.last_failure_error is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Parent-completion invariant at the claim gate (RCA t_a6acd07d)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_claim_rejects_when_parents_not_done(kanban_home):
|
|
"""claim_task must refuse ready->running if any parent isn't 'done'.
|
|
|
|
Simulates the create-then-link race: a task gets status='ready' via a
|
|
racy writer while it still has undone parents. The claim gate must
|
|
detect the violation, demote the child back to 'todo', append a
|
|
'claim_rejected' event, and return None. Covers Fix 1 of the RCA.
|
|
"""
|
|
with kb.connect() as conn:
|
|
parent = kb.create_task(conn, title="parent", assignee="a")
|
|
child = kb.create_task(
|
|
conn, title="child", assignee="a", parents=[parent],
|
|
)
|
|
# Child correctly starts 'todo' because parent is not 'done'.
|
|
assert kb.get_task(conn, child).status == "todo"
|
|
# Simulate the race: a racy writer force-promotes the child to
|
|
# 'ready' while parent is still pending.
|
|
conn.execute(
|
|
"UPDATE tasks SET status='ready' WHERE id=?", (child,),
|
|
)
|
|
conn.commit()
|
|
assert kb.get_task(conn, child).status == "ready"
|
|
|
|
result = kb.claim_task(conn, child, claimer="host:1")
|
|
|
|
assert result is None
|
|
with kb.connect() as conn:
|
|
assert kb.get_task(conn, child).status == "todo"
|
|
events = conn.execute(
|
|
"SELECT kind, payload FROM task_events "
|
|
"WHERE task_id = ? ORDER BY id",
|
|
(child,),
|
|
).fetchall()
|
|
kinds = [e["kind"] for e in events]
|
|
assert "claim_rejected" in kinds
|
|
# No 'claimed' event was emitted for the blocked attempt.
|
|
assert "claimed" not in kinds
|
|
|
|
|
|
def test_claim_succeeds_once_parents_done(kanban_home):
|
|
"""After parents complete, recompute_ready -> claim_task must succeed."""
|
|
with kb.connect() as conn:
|
|
parent = kb.create_task(conn, title="parent", assignee="a")
|
|
child = kb.create_task(
|
|
conn, title="child", assignee="a", parents=[parent],
|
|
)
|
|
kb.claim_task(conn, parent)
|
|
assert kb.complete_task(conn, parent, result="ok")
|
|
kb.recompute_ready(conn)
|
|
assert kb.get_task(conn, child).status == "ready"
|
|
claimed = kb.claim_task(conn, child, claimer="host:1")
|
|
assert claimed is not None
|
|
assert claimed.status == "running"
|
|
|
|
|
|
def test_create_with_parents_stays_todo_until_parents_done(kanban_home):
|
|
"""kanban_create(parents=[...]) must land in 'todo' and only promote on parent done."""
|
|
with kb.connect() as conn:
|
|
parent = kb.create_task(conn, title="parent", assignee="a")
|
|
child = kb.create_task(
|
|
conn, title="child", assignee="a", parents=[parent],
|
|
)
|
|
assert kb.get_task(conn, child).status == "todo"
|
|
# Dispatcher tick between create and some later event must NOT
|
|
# produce a winner for this child.
|
|
promoted = kb.recompute_ready(conn)
|
|
assert promoted == 0
|
|
assert kb.get_task(conn, child).status == "todo"
|
|
# Complete parent; complete_task internally runs recompute_ready,
|
|
# which promotes the child to 'ready'.
|
|
kb.claim_task(conn, parent)
|
|
kb.complete_task(conn, parent, result="ok")
|
|
assert kb.get_task(conn, child).status == "ready"
|
|
|
|
|
|
def test_unblock_with_pending_parents_goes_to_todo(kanban_home):
|
|
"""unblock_task must re-gate on parent completion (Fix 3).
|
|
|
|
A task blocked while parents are still in progress must return to
|
|
'todo' (not 'ready') on unblock. Otherwise the dispatcher will claim
|
|
it immediately, repeating Bug 2 from the RCA.
|
|
"""
|
|
with kb.connect() as conn:
|
|
parent = kb.create_task(conn, title="parent", assignee="a")
|
|
child = kb.create_task(
|
|
conn, title="child", assignee="a", parents=[parent],
|
|
)
|
|
# Force child into 'blocked' regardless of parent progress
|
|
# (simulates a worker that self-blocked, or an operator block).
|
|
conn.execute(
|
|
"UPDATE tasks SET status='blocked' WHERE id=?", (child,),
|
|
)
|
|
conn.commit()
|
|
assert kb.unblock_task(conn, child)
|
|
assert kb.get_task(conn, child).status == "todo"
|
|
# After parent completes + recompute, the child is ready.
|
|
kb.claim_task(conn, parent)
|
|
kb.complete_task(conn, parent, result="ok")
|
|
kb.recompute_ready(conn)
|
|
assert kb.get_task(conn, child).status == "ready"
|
|
|
|
|
|
def test_unblock_without_parents_goes_to_ready(kanban_home):
|
|
"""Parent-free unblock still produces 'ready' (behavior preserved)."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="lone", assignee="a")
|
|
kb.claim_task(conn, t)
|
|
assert kb.block_task(conn, t, reason="need input")
|
|
assert kb.unblock_task(conn, t)
|
|
assert kb.get_task(conn, t).status == "ready"
|
|
|
|
|
|
def test_assign_refuses_while_running(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x", assignee="a")
|
|
kb.claim_task(conn, t)
|
|
with pytest.raises(RuntimeError, match="currently running"):
|
|
kb.assign_task(conn, t, "b")
|
|
|
|
|
|
def test_assign_reassigns_when_not_running(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x", assignee="a")
|
|
assert kb.assign_task(conn, t, "b")
|
|
assert kb.get_task(conn, t).assignee == "b"
|
|
|
|
|
|
def test_assignee_normalized_to_lowercase_on_create_and_assign(kanban_home):
|
|
"""Dashboard/CLI may pass title-cased profile labels; DB + spawn use canonical id."""
|
|
with kb.connect() as conn:
|
|
tid = kb.create_task(conn, title="cased", assignee="Jules")
|
|
assert kb.get_task(conn, tid).assignee == "jules"
|
|
assert kb.assign_task(conn, tid, "Librarian")
|
|
assert kb.get_task(conn, tid).assignee == "librarian"
|
|
|
|
|
|
def test_list_tasks_assignee_filter_case_insensitive(kanban_home):
|
|
with kb.connect() as conn:
|
|
tid = kb.create_task(conn, title="q", assignee="jules")
|
|
found = kb.list_tasks(conn, assignee="Jules")
|
|
assert len(found) == 1 and found[0].id == tid
|
|
|
|
|
|
def test_archive_hides_from_default_list(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x")
|
|
kb.complete_task(conn, t)
|
|
assert kb.archive_task(conn, t)
|
|
assert len(kb.list_tasks(conn)) == 0
|
|
assert len(kb.list_tasks(conn, include_archived=True)) == 1
|
|
|
|
|
|
def test_delete_archived_task_removes_related_rows(kanban_home):
|
|
with kb.connect() as conn:
|
|
parent = kb.create_task(conn, title="parent")
|
|
tid = kb.create_task(conn, title="child", parents=[parent], assignee="worker")
|
|
kb.add_comment(conn, tid, "user", "cleanup me")
|
|
kb.claim_task(conn, tid)
|
|
kb.complete_task(conn, tid, result="done")
|
|
assert kb.archive_task(conn, tid)
|
|
conn.execute(
|
|
"INSERT INTO kanban_notify_subs(task_id, platform, chat_id, thread_id, user_id, created_at, last_event_id) "
|
|
"VALUES (?, 'telegram', '123', '', 'u', 0, 0)",
|
|
(tid,),
|
|
)
|
|
conn.commit()
|
|
|
|
assert kb.delete_archived_task(conn, tid) is True
|
|
assert kb.get_task(conn, tid) is None
|
|
assert conn.execute("SELECT COUNT(*) FROM task_links WHERE child_id = ? OR parent_id = ?", (tid, tid)).fetchone()[0] == 0
|
|
assert conn.execute("SELECT COUNT(*) FROM task_comments WHERE task_id = ?", (tid,)).fetchone()[0] == 0
|
|
assert conn.execute("SELECT COUNT(*) FROM task_events WHERE task_id = ?", (tid,)).fetchone()[0] == 0
|
|
assert conn.execute("SELECT COUNT(*) FROM task_runs WHERE task_id = ?", (tid,)).fetchone()[0] == 0
|
|
assert conn.execute("SELECT COUNT(*) FROM kanban_notify_subs WHERE task_id = ?", (tid,)).fetchone()[0] == 0
|
|
|
|
|
|
def test_delete_archived_task_rejects_non_archived_rows(kanban_home):
|
|
with kb.connect() as conn:
|
|
tid = kb.create_task(conn, title="live")
|
|
assert kb.delete_archived_task(conn, tid) is False
|
|
assert kb.get_task(conn, tid) is not None
|
|
|
|
|
|
def test_list_tasks_order_by(kanban_home):
|
|
with kb.connect() as conn:
|
|
# Create tasks with different titles and priorities
|
|
t_a = kb.create_task(conn, title="alpha", priority=1)
|
|
t_b = kb.create_task(conn, title="beta", priority=2)
|
|
t_c = kb.create_task(conn, title="gamma", priority=1)
|
|
|
|
# Default sort: priority DESC, created ASC
|
|
default = kb.list_tasks(conn)
|
|
assert [t.id for t in default] == [t_b, t_a, t_c]
|
|
|
|
# Sort by title ASC
|
|
by_title = kb.list_tasks(conn, order_by="title")
|
|
assert [t.id for t in by_title] == [t_a, t_b, t_c]
|
|
|
|
# Sort by assignee
|
|
kb.assign_task(conn, t_a, "alice")
|
|
kb.assign_task(conn, t_b, "bob")
|
|
kb.assign_task(conn, t_c, "alice")
|
|
by_assignee = kb.list_tasks(conn, order_by="assignee")
|
|
# alice's tasks first (alphabetically), then bob's
|
|
assignees = [t.assignee for t in by_assignee]
|
|
assert assignees[:2] == ["alice", "alice"]
|
|
assert assignees[2] == "bob"
|
|
|
|
# Invalid sort order raises ValueError
|
|
try:
|
|
kb.list_tasks(conn, order_by="bogus")
|
|
assert False, "Should have raised ValueError"
|
|
except ValueError as e:
|
|
assert "order_by must be one of" in str(e)
|
|
|
|
def test_delete_task_removes_task_and_cascades(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="to-delete", assignee="alice")
|
|
kb.add_comment(conn, t, "user", "comment")
|
|
kb.add_comment(conn, t, "user", "another")
|
|
assert kb.delete_task(conn, t)
|
|
assert kb.get_task(conn, t) is None
|
|
assert len(kb.list_comments(conn, t)) == 0
|
|
assert len(kb.list_events(conn, t)) == 0
|
|
assert len(kb.list_runs(conn, t)) == 0
|
|
|
|
|
|
def test_delete_task_returns_false_for_missing_task(kanban_home):
|
|
with kb.connect() as conn:
|
|
assert not kb.delete_task(conn, "t_nonexistent")
|
|
|
|
|
|
def test_delete_task_cascades_links(kanban_home):
|
|
with kb.connect() as conn:
|
|
p = kb.create_task(conn, title="parent")
|
|
c = kb.create_task(conn, title="child", parents=[p])
|
|
child = kb.get_task(conn, c)
|
|
assert child is not None and child.status == "todo"
|
|
kb.delete_task(conn, p)
|
|
assert kb.get_task(conn, p) is None
|
|
child_after = kb.get_task(conn, c)
|
|
assert child_after is not None and child_after.status == "ready"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Comments / events / worker context
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_comments_recorded_in_order(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x")
|
|
kb.add_comment(conn, t, "user", "first")
|
|
kb.add_comment(conn, t, "researcher", "second")
|
|
comments = kb.list_comments(conn, t)
|
|
assert [c.body for c in comments] == ["first", "second"]
|
|
assert [c.author for c in comments] == ["user", "researcher"]
|
|
|
|
|
|
def test_empty_comment_rejected(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x")
|
|
with pytest.raises(ValueError, match="body is required"):
|
|
kb.add_comment(conn, t, "user", "")
|
|
|
|
|
|
def test_events_capture_lifecycle(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x", assignee="a")
|
|
kb.claim_task(conn, t)
|
|
kb.complete_task(conn, t, result="ok")
|
|
events = kb.list_events(conn, t)
|
|
kinds = [e.kind for e in events]
|
|
assert "created" in kinds
|
|
assert "claimed" in kinds
|
|
assert "completed" in kinds
|
|
|
|
|
|
def test_worker_context_includes_parent_results_and_comments(kanban_home):
|
|
with kb.connect() as conn:
|
|
p = kb.create_task(conn, title="p")
|
|
kb.complete_task(conn, p, result="PARENT_RESULT_MARKER")
|
|
c = kb.create_task(conn, title="child", parents=[p])
|
|
kb.add_comment(conn, c, "user", "CLARIFICATION_MARKER")
|
|
ctx = kb.build_worker_context(conn, c)
|
|
assert "PARENT_RESULT_MARKER" in ctx
|
|
assert "CLARIFICATION_MARKER" in ctx
|
|
assert c in ctx
|
|
assert "child" in ctx
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Dispatcher
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_dispatch_dry_run_does_not_claim(kanban_home, all_assignees_spawnable):
|
|
with kb.connect() as conn:
|
|
t1 = kb.create_task(conn, title="a", assignee="alice")
|
|
t2 = kb.create_task(conn, title="b", assignee="bob")
|
|
res = kb.dispatch_once(conn, dry_run=True)
|
|
assert {s[0] for s in res.spawned} == {t1, t2}
|
|
with kb.connect() as conn:
|
|
# Dry run must NOT mutate status.
|
|
assert kb.get_task(conn, t1).status == "ready"
|
|
assert kb.get_task(conn, t2).status == "ready"
|
|
|
|
|
|
def test_dispatch_skips_unassigned(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="floater")
|
|
res = kb.dispatch_once(conn, dry_run=True)
|
|
assert t in res.skipped_unassigned
|
|
assert t not in res.skipped_nonspawnable
|
|
assert not res.spawned
|
|
|
|
|
|
def test_dispatch_skips_nonspawnable_into_separate_bucket(kanban_home, monkeypatch):
|
|
"""Tasks whose assignee fails profile_exists() must NOT land in
|
|
``skipped_unassigned`` (which is operator-actionable) — they go in
|
|
the dedicated ``skipped_nonspawnable`` bucket so health telemetry
|
|
can suppress false-positive "stuck" warnings."""
|
|
from hermes_cli import profiles
|
|
monkeypatch.setattr(profiles, "profile_exists", lambda name: False)
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="for-terminal", assignee="orion-cc")
|
|
res = kb.dispatch_once(conn, dry_run=True)
|
|
assert t in res.skipped_nonspawnable
|
|
assert t not in res.skipped_unassigned
|
|
assert not res.spawned
|
|
|
|
|
|
def test_has_spawnable_ready_false_when_only_terminal_lanes(kanban_home, monkeypatch):
|
|
"""``has_spawnable_ready`` returns False when every ready task is
|
|
assigned to a control-plane lane — used by gateway/CLI dispatchers
|
|
to silence the stuck-warn while terminals still have queued work."""
|
|
from hermes_cli import profiles
|
|
monkeypatch.setattr(profiles, "profile_exists", lambda name: False)
|
|
with kb.connect() as conn:
|
|
kb.create_task(conn, title="t1", assignee="orion-cc")
|
|
kb.create_task(conn, title="t2", assignee="orion-research")
|
|
assert kb.has_spawnable_ready(conn) is False
|
|
|
|
|
|
def test_has_spawnable_ready_true_when_real_profile_present(kanban_home, monkeypatch):
|
|
"""``has_spawnable_ready`` returns True as soon as ANY ready task
|
|
has an assignee that maps to a real Hermes profile — preserves the
|
|
real "stuck" signal when a daily/agent task is queued."""
|
|
from hermes_cli import profiles
|
|
monkeypatch.setattr(
|
|
profiles, "profile_exists", lambda name: name == "daily"
|
|
)
|
|
with kb.connect() as conn:
|
|
kb.create_task(conn, title="terminal-task", assignee="orion-cc")
|
|
kb.create_task(conn, title="hermes-task", assignee="daily")
|
|
assert kb.has_spawnable_ready(conn) is True
|
|
|
|
|
|
def test_has_spawnable_ready_false_on_empty_queue(kanban_home):
|
|
"""Empty queue is the trivial false case — no ready tasks at all."""
|
|
with kb.connect() as conn:
|
|
assert kb.has_spawnable_ready(conn) is False
|
|
|
|
|
|
def test_dispatch_promotes_ready_and_spawns(kanban_home, all_assignees_spawnable):
|
|
spawns = []
|
|
|
|
def fake_spawn(task, workspace):
|
|
spawns.append((task.id, task.assignee, workspace))
|
|
|
|
with kb.connect() as conn:
|
|
p = kb.create_task(conn, title="p", assignee="alice")
|
|
c = kb.create_task(conn, title="c", assignee="bob", parents=[p])
|
|
# Finish parent outside dispatch; promotion happens inside.
|
|
kb.complete_task(conn, p)
|
|
res = kb.dispatch_once(conn, spawn_fn=fake_spawn)
|
|
# Spawned c (a was already done when dispatch was called).
|
|
assert len(spawns) == 1
|
|
assert spawns[0][0] == c
|
|
assert spawns[0][1] == "bob"
|
|
# c is now running
|
|
with kb.connect() as conn:
|
|
assert kb.get_task(conn, c).status == "running"
|
|
|
|
|
|
def test_dispatch_spawn_failure_releases_claim(kanban_home, all_assignees_spawnable):
|
|
def boom(task, workspace):
|
|
raise RuntimeError("spawn failed")
|
|
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="boom", assignee="alice")
|
|
kb.dispatch_once(conn, spawn_fn=boom)
|
|
# Must return to ready so the next tick can retry.
|
|
assert kb.get_task(conn, t).status == "ready"
|
|
assert kb.get_task(conn, t).claim_lock is None
|
|
|
|
|
|
def test_dispatch_max_spawn_counts_existing_running_tasks(
|
|
kanban_home, all_assignees_spawnable
|
|
):
|
|
"""max_spawn is a live concurrency cap, not a per-tick spawn cap.
|
|
|
|
Without counting tasks already in ``running``, every dispatcher tick can
|
|
launch up to ``max_spawn`` more workers while previous workers are still
|
|
alive. Long-running boards then accumulate unbounded worker subprocesses.
|
|
"""
|
|
spawns = []
|
|
|
|
def fake_spawn(task, workspace):
|
|
spawns.append(task.id)
|
|
|
|
with kb.connect() as conn:
|
|
running_a = kb.create_task(conn, title="running-a", assignee="alice")
|
|
running_b = kb.create_task(conn, title="running-b", assignee="bob")
|
|
ready = kb.create_task(conn, title="ready", assignee="carol")
|
|
kb.claim_task(conn, running_a)
|
|
kb.claim_task(conn, running_b)
|
|
|
|
res = kb.dispatch_once(conn, spawn_fn=fake_spawn, max_spawn=2)
|
|
|
|
assert res.spawned == []
|
|
assert spawns == []
|
|
assert kb.get_task(conn, ready).status == "ready"
|
|
|
|
|
|
def test_dispatch_max_spawn_fills_remaining_capacity(
|
|
kanban_home, all_assignees_spawnable
|
|
):
|
|
"""When below cap, dispatch only fills available worker slots."""
|
|
spawns = []
|
|
|
|
def fake_spawn(task, workspace):
|
|
spawns.append(task.id)
|
|
|
|
with kb.connect() as conn:
|
|
running = kb.create_task(conn, title="running", assignee="alice")
|
|
ready_a = kb.create_task(conn, title="ready-a", assignee="bob")
|
|
ready_b = kb.create_task(conn, title="ready-b", assignee="carol")
|
|
kb.claim_task(conn, running)
|
|
|
|
res = kb.dispatch_once(conn, spawn_fn=fake_spawn, max_spawn=2)
|
|
|
|
assert len(res.spawned) == 1
|
|
assert spawns == [ready_a]
|
|
assert kb.get_task(conn, ready_a).status == "running"
|
|
assert kb.get_task(conn, ready_b).status == "ready"
|
|
|
|
|
|
def test_dispatch_reclaims_stale_before_spawning(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x", assignee="alice")
|
|
kb.claim_task(conn, t)
|
|
conn.execute(
|
|
"UPDATE tasks SET claim_expires = ? WHERE id = ?",
|
|
(int(time.time()) - 1, t),
|
|
)
|
|
res = kb.dispatch_once(conn, dry_run=True)
|
|
assert res.reclaimed == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Respawn guard (check_respawn_guard + dispatch_once integration)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_respawn_guard_none_on_fresh_task(kanban_home):
|
|
"""A fresh task with no failures or runs is not guarded."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="fresh", assignee="alice")
|
|
reason = kb.check_respawn_guard(conn, t)
|
|
assert reason is None
|
|
|
|
|
|
def test_respawn_guard_blocker_auth_on_quota_error(kanban_home):
|
|
"""'quota' in last_failure_error triggers blocker_auth."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="quota-task", assignee="alice")
|
|
conn.execute(
|
|
"UPDATE tasks SET last_failure_error = ? WHERE id = ?",
|
|
("API quota exceeded: rate limit hit", t),
|
|
)
|
|
reason = kb.check_respawn_guard(conn, t)
|
|
assert reason == "blocker_auth"
|
|
|
|
|
|
def test_respawn_guard_blocker_auth_on_auth_error(kanban_home):
|
|
"""'unauthorized' in last_failure_error triggers blocker_auth."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="auth-task", assignee="alice")
|
|
conn.execute(
|
|
"UPDATE tasks SET last_failure_error = ? WHERE id = ?",
|
|
("403 Forbidden: unauthorized to access resource", t),
|
|
)
|
|
reason = kb.check_respawn_guard(conn, t)
|
|
assert reason == "blocker_auth"
|
|
|
|
|
|
def test_respawn_guard_blocker_auth_on_authentication_error(kanban_home):
|
|
"""Full word 'Authentication' triggers blocker_auth (regex covers auth\\w*)."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="authn-task", assignee="alice")
|
|
conn.execute(
|
|
"UPDATE tasks SET last_failure_error = ? WHERE id = ?",
|
|
("Authentication failed: invalid credentials", t),
|
|
)
|
|
reason = kb.check_respawn_guard(conn, t)
|
|
assert reason == "blocker_auth"
|
|
|
|
|
|
def test_respawn_guard_blocker_auth_on_authorization_error(kanban_home):
|
|
"""Full word 'authorization' triggers blocker_auth (regex covers auth\\w*)."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="authz-task", assignee="alice")
|
|
conn.execute(
|
|
"UPDATE tasks SET last_failure_error = ? WHERE id = ?",
|
|
("authorization denied for scope repo", t),
|
|
)
|
|
reason = kb.check_respawn_guard(conn, t)
|
|
assert reason == "blocker_auth"
|
|
|
|
|
|
def test_respawn_guard_recent_success(kanban_home):
|
|
"""A completed run within the guard window triggers recent_success."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="already-done", assignee="alice")
|
|
now = int(time.time())
|
|
conn.execute(
|
|
"INSERT INTO task_runs (task_id, status, outcome, started_at, ended_at) "
|
|
"VALUES (?, 'done', 'completed', ?, ?)",
|
|
(t, now - 120, now - 60),
|
|
)
|
|
reason = kb.check_respawn_guard(conn, t)
|
|
assert reason == "recent_success"
|
|
|
|
|
|
def test_respawn_guard_stale_success_not_guarded(kanban_home):
|
|
"""A completed run outside the guard window does not block re-spawn."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="old-done", assignee="alice")
|
|
old_end = int(time.time()) - kb._RESPAWN_GUARD_SUCCESS_WINDOW - 60
|
|
conn.execute(
|
|
"INSERT INTO task_runs (task_id, status, outcome, started_at, ended_at) "
|
|
"VALUES (?, 'done', 'completed', ?, ?)",
|
|
(t, old_end - 300, old_end),
|
|
)
|
|
reason = kb.check_respawn_guard(conn, t)
|
|
assert reason is None
|
|
|
|
|
|
def test_respawn_guard_active_pr_in_comment(kanban_home):
|
|
"""A GitHub PR URL in a recent comment triggers active_pr."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="has-pr", assignee="alice")
|
|
kb.add_comment(
|
|
conn, t, "worker",
|
|
"PR created: https://github.com/totemx-AI/subsidysmart/pull/42",
|
|
)
|
|
reason = kb.check_respawn_guard(conn, t)
|
|
assert reason == "active_pr"
|
|
|
|
|
|
def test_respawn_guard_old_pr_comment_not_guarded(kanban_home):
|
|
"""A GitHub PR URL in a comment older than the PR window does not block."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="old-pr", assignee="alice")
|
|
old_ts = int(time.time()) - kb._RESPAWN_GUARD_PR_WINDOW - 60
|
|
conn.execute(
|
|
"INSERT INTO task_comments (task_id, author, body, created_at) "
|
|
"VALUES (?, 'worker', "
|
|
"'PR: https://github.com/totemx-AI/subsidysmart/pull/10', ?)",
|
|
(t, old_ts),
|
|
)
|
|
reason = kb.check_respawn_guard(conn, t)
|
|
assert reason is None
|
|
|
|
|
|
def test_dispatch_respawn_guard_defers_auth_error_without_auto_block(
|
|
kanban_home, all_assignees_spawnable
|
|
):
|
|
"""dispatch_once defers (does NOT auto-block) a ready task whose last
|
|
error is a blocker_auth.
|
|
|
|
The old behaviour auto-blocked on first occurrence, which was too
|
|
aggressive: a transient 429 rate-limit (which typically clears in
|
|
seconds to minutes) would end up requiring manual unblock. The new
|
|
behaviour defers the spawn this tick; the task stays in ``ready``
|
|
and gets another chance next tick. If the auth error genuinely
|
|
persists, the existing ``consecutive_failures`` circuit breaker
|
|
will auto-block via the normal failure-limit path.
|
|
"""
|
|
spawned_ids = []
|
|
|
|
def fake_spawn(task, workspace):
|
|
spawned_ids.append(task.id)
|
|
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="quota-storm", assignee="alice")
|
|
conn.execute(
|
|
"UPDATE tasks SET last_failure_error = ? WHERE id = ?",
|
|
("rate limit exceeded: 429 Too Many Requests", t),
|
|
)
|
|
res = kb.dispatch_once(conn, spawn_fn=fake_spawn)
|
|
|
|
# Critical: task is NOT auto-blocked on first occurrence.
|
|
assert t not in res.auto_blocked, (
|
|
f"blocker_auth should defer, not auto-block on first occurrence; "
|
|
f"got auto_blocked={res.auto_blocked!r}"
|
|
)
|
|
# It IS recorded as respawn_guarded with the reason.
|
|
assert (t, "blocker_auth") in res.respawn_guarded, (
|
|
f"expected (task_id, 'blocker_auth') in respawn_guarded; "
|
|
f"got {res.respawn_guarded!r}"
|
|
)
|
|
# And it's NOT spawned this tick.
|
|
assert t not in spawned_ids
|
|
# Status stays ``ready`` so a future tick (or operator action) can
|
|
# retry without manual unblock.
|
|
with kb.connect() as conn:
|
|
assert kb.get_task(conn, t).status == "ready"
|
|
|
|
|
|
def test_dispatch_respawn_guard_skips_recent_success(
|
|
kanban_home, all_assignees_spawnable
|
|
):
|
|
"""dispatch_once skips (but does not block) a task with a recent completed run."""
|
|
spawned_ids = []
|
|
|
|
def fake_spawn(task, workspace):
|
|
spawned_ids.append(task.id)
|
|
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="recent-winner", assignee="alice")
|
|
now = int(time.time())
|
|
conn.execute(
|
|
"INSERT INTO task_runs (task_id, status, outcome, started_at, ended_at) "
|
|
"VALUES (?, 'done', 'completed', ?, ?)",
|
|
(t, now - 300, now - 60),
|
|
)
|
|
res = kb.dispatch_once(conn, spawn_fn=fake_spawn)
|
|
|
|
assert (t, "recent_success") in res.respawn_guarded
|
|
assert t not in spawned_ids
|
|
assert t not in res.auto_blocked
|
|
with kb.connect() as conn:
|
|
assert kb.get_task(conn, t).status == "ready" # not blocked, just skipped
|
|
|
|
|
|
def test_dispatch_respawn_guard_skips_active_pr(
|
|
kanban_home, all_assignees_spawnable
|
|
):
|
|
"""dispatch_once skips (but does not block) a task with an active PR comment."""
|
|
spawned_ids = []
|
|
|
|
def fake_spawn(task, workspace):
|
|
spawned_ids.append(task.id)
|
|
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="has-pr", assignee="alice")
|
|
kb.add_comment(
|
|
conn, t, "worker",
|
|
"Opened https://github.com/totemx-AI/subsidysmart/pull/99",
|
|
)
|
|
res = kb.dispatch_once(conn, spawn_fn=fake_spawn)
|
|
|
|
assert (t, "active_pr") in res.respawn_guarded
|
|
assert t not in spawned_ids
|
|
assert t not in res.auto_blocked
|
|
with kb.connect() as conn:
|
|
assert kb.get_task(conn, t).status == "ready"
|
|
|
|
|
|
def test_dispatch_respawn_guard_dry_run_no_auto_block(
|
|
kanban_home, all_assignees_spawnable
|
|
):
|
|
"""In dry_run mode, blocker_auth tasks are recorded in respawn_guarded (not auto-blocked)."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="dry-quota", assignee="alice")
|
|
conn.execute(
|
|
"UPDATE tasks SET last_failure_error = ? WHERE id = ?",
|
|
("quota exceeded", t),
|
|
)
|
|
res = kb.dispatch_once(conn, dry_run=True)
|
|
|
|
assert (t, "blocker_auth") in res.respawn_guarded
|
|
assert t not in res.auto_blocked
|
|
with kb.connect() as conn:
|
|
assert kb.get_task(conn, t).status == "ready" # dry_run: no writes
|
|
|
|
|
|
def test_dispatch_respawn_guard_allows_clean_task(
|
|
kanban_home, all_assignees_spawnable
|
|
):
|
|
"""A task with no guard triggers is spawned normally."""
|
|
spawned_ids = []
|
|
|
|
def fake_spawn(task, workspace):
|
|
spawned_ids.append(task.id)
|
|
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="clean-task", assignee="alice")
|
|
res = kb.dispatch_once(conn, spawn_fn=fake_spawn)
|
|
|
|
assert t in spawned_ids
|
|
assert not res.respawn_guarded
|
|
assert t not in res.auto_blocked
|
|
|
|
|
|
def test_dispatch_respawn_guard_emits_event_for_skipped_task(
|
|
kanban_home, all_assignees_spawnable
|
|
):
|
|
"""dispatch_once emits a respawn_guarded task_event so operators can diagnose stuck-ready tasks."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="event-check", assignee="alice")
|
|
now = int(time.time())
|
|
conn.execute(
|
|
"INSERT INTO task_runs (task_id, status, outcome, started_at, ended_at) "
|
|
"VALUES (?, 'done', 'completed', ?, ?)",
|
|
(t, now - 300, now - 60),
|
|
)
|
|
kb.dispatch_once(conn, spawn_fn=lambda task, ws: None)
|
|
events = kb.list_events(conn, t)
|
|
|
|
kinds = [e.kind for e in events]
|
|
assert "respawn_guarded" in kinds
|
|
guarded_evt = next(e for e in events if e.kind == "respawn_guarded")
|
|
# Event.payload is already parsed as a dict by list_events.
|
|
assert isinstance(guarded_evt.payload, dict)
|
|
assert guarded_evt.payload.get("reason") == "recent_success"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Workspace resolution
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_scratch_workspace_created_under_hermes_home(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x")
|
|
task = kb.get_task(conn, t)
|
|
ws = kb.resolve_workspace(task)
|
|
assert ws.exists()
|
|
assert ws.is_dir()
|
|
assert "kanban" in str(ws)
|
|
|
|
|
|
def test_dir_workspace_honors_given_path(kanban_home, tmp_path):
|
|
target = tmp_path / "my-vault"
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(
|
|
conn, title="biz", workspace_kind="dir", workspace_path=str(target)
|
|
)
|
|
task = kb.get_task(conn, t)
|
|
ws = kb.resolve_workspace(task)
|
|
assert ws == target
|
|
assert ws.exists()
|
|
|
|
|
|
def test_worktree_workspace_returns_intended_path(kanban_home, tmp_path):
|
|
target = str(tmp_path / ".worktrees" / "my-task")
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(
|
|
conn, title="ship", workspace_kind="worktree", workspace_path=target
|
|
)
|
|
task = kb.get_task(conn, t)
|
|
ws = kb.resolve_workspace(task)
|
|
# We do NOT auto-create worktrees; the worker's skill handles that.
|
|
assert str(ws) == target
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tenancy
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_tenant_column_filters_listings(kanban_home):
|
|
with kb.connect() as conn:
|
|
kb.create_task(conn, title="a1", tenant="biz-a")
|
|
kb.create_task(conn, title="b1", tenant="biz-b")
|
|
kb.create_task(conn, title="shared") # no tenant
|
|
biz_a = kb.list_tasks(conn, tenant="biz-a")
|
|
biz_b = kb.list_tasks(conn, tenant="biz-b")
|
|
assert [t.title for t in biz_a] == ["a1"]
|
|
assert [t.title for t in biz_b] == ["b1"]
|
|
|
|
|
|
def test_list_tasks_filters_workflow_template_and_step(kanban_home):
|
|
with kb.connect() as conn:
|
|
ta = kb.create_task(conn, title="alpha")
|
|
tb = kb.create_task(conn, title="beta")
|
|
conn.execute(
|
|
"UPDATE tasks SET workflow_template_id=?, current_step_key=? WHERE id=?",
|
|
("wf1", "step_x", ta),
|
|
)
|
|
conn.execute(
|
|
"UPDATE tasks SET workflow_template_id=?, current_step_key=? WHERE id=?",
|
|
("wf1", "step_y", tb),
|
|
)
|
|
conn.commit()
|
|
by_wf = kb.list_tasks(conn, workflow_template_id="wf1")
|
|
by_step = kb.list_tasks(conn, current_step_key="step_x")
|
|
assert {x.id for x in by_wf} == {ta, tb}
|
|
assert [x.id for x in by_step] == [ta]
|
|
|
|
|
|
def test_list_runs_state_filter_requires_pair_and_valid_type(kanban_home):
|
|
with kb.connect() as conn:
|
|
tid = kb.create_task(conn, title="t", assignee="alice")
|
|
with kb.connect() as conn:
|
|
with pytest.raises(ValueError, match="both"):
|
|
kb.list_runs(conn, tid, state_type="status", state_name=None)
|
|
with pytest.raises(ValueError, match="both"):
|
|
kb.list_runs(conn, tid, state_type=None, state_name="done")
|
|
with pytest.raises(ValueError, match="state_type"):
|
|
kb.list_runs(conn, tid, state_type="nope", state_name="done")
|
|
|
|
|
|
def test_list_runs_filters_by_outcome_value(kanban_home):
|
|
with kb.connect() as conn:
|
|
tid = kb.create_task(conn, title="t", assignee="alice")
|
|
kb.complete_task(conn, tid, summary="ok")
|
|
matching = kb.list_runs(conn, tid, state_type="outcome", state_name="completed")
|
|
empty = kb.list_runs(conn, tid, state_type="outcome", state_name="blocked")
|
|
assert matching
|
|
assert not empty
|
|
|
|
|
|
def test_tenant_propagates_to_events(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="tenant-task", tenant="biz-a")
|
|
events = kb.list_events(conn, t)
|
|
# The "created" event should have tenant in its payload.
|
|
created = [e for e in events if e.kind == "created"]
|
|
assert created and created[0].payload.get("tenant") == "biz-a"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Originating session id (ACP propagation)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_create_task_stamps_session_id(kanban_home):
|
|
with kb.connect() as conn:
|
|
tid = kb.create_task(
|
|
conn, title="from chat", session_id="acp-sess-123"
|
|
)
|
|
t = kb.get_task(conn, tid)
|
|
assert t is not None
|
|
assert t.session_id == "acp-sess-123"
|
|
|
|
|
|
def test_create_task_session_id_defaults_to_none(kanban_home):
|
|
with kb.connect() as conn:
|
|
tid = kb.create_task(conn, title="cli-created")
|
|
t = kb.get_task(conn, tid)
|
|
assert t is not None
|
|
assert t.session_id is None
|
|
|
|
|
|
def test_session_id_filters_listings(kanban_home):
|
|
with kb.connect() as conn:
|
|
kb.create_task(conn, title="s1-a", session_id="sess-1")
|
|
kb.create_task(conn, title="s1-b", session_id="sess-1")
|
|
kb.create_task(conn, title="s2-a", session_id="sess-2")
|
|
kb.create_task(conn, title="cli-only") # no session
|
|
sess1 = kb.list_tasks(conn, session_id="sess-1")
|
|
sess2 = kb.list_tasks(conn, session_id="sess-2")
|
|
unscoped = kb.list_tasks(conn)
|
|
assert sorted(t.title for t in sess1) == ["s1-a", "s1-b"]
|
|
assert [t.title for t in sess2] == ["s2-a"]
|
|
# Unscoped list still returns everything (legacy NULL rows visible).
|
|
assert len(unscoped) == 4
|
|
|
|
|
|
def test_session_id_index_exists(kanban_home):
|
|
"""The migration creates an index on session_id for cheap per-session
|
|
list queries on busy boards. Without it, a chat-scoped poll would
|
|
full-scan the tasks table."""
|
|
with kb.connect() as conn:
|
|
rows = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='index' "
|
|
"AND tbl_name='tasks'"
|
|
).fetchall()
|
|
names = {r["name"] for r in rows}
|
|
assert "idx_tasks_session_id" in names
|
|
|
|
|
|
def test_session_id_compose_with_tenant_filter(kanban_home):
|
|
"""A client may want both `tenant=scarf:foo` AND `session=acp-x` —
|
|
the filters must AND, not replace."""
|
|
with kb.connect() as conn:
|
|
kb.create_task(
|
|
conn, title="match", tenant="scarf:foo", session_id="acp-x"
|
|
)
|
|
kb.create_task(
|
|
conn, title="wrong-tenant", tenant="other", session_id="acp-x"
|
|
)
|
|
kb.create_task(
|
|
conn, title="wrong-session",
|
|
tenant="scarf:foo", session_id="acp-y",
|
|
)
|
|
rows = kb.list_tasks(
|
|
conn, tenant="scarf:foo", session_id="acp-x"
|
|
)
|
|
assert [t.title for t in rows] == ["match"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shared-board path resolution (issue #19348)
|
|
#
|
|
# The kanban board is a cross-profile coordination primitive: a worker
|
|
# spawned with `hermes -p <profile>` must read/write the same kanban.db
|
|
# as the dispatcher that claimed the task. These tests exercise the
|
|
# path-resolution layer directly and would have caught the regression
|
|
# where `kanban_db_path()` resolved to the active profile's HERMES_HOME.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestSharedBoardPaths:
|
|
"""`kanban_home`/`kanban_db_path`/`workspaces_root`/`worker_log_path`
|
|
must anchor at the **shared root**, not the active profile's HERMES_HOME."""
|
|
|
|
def _set_home(self, monkeypatch, tmp_path, hermes_home):
|
|
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
monkeypatch.delenv("HERMES_KANBAN_HOME", raising=False)
|
|
|
|
def test_default_install_anchors_at_home_dot_hermes(
|
|
self, tmp_path, monkeypatch
|
|
):
|
|
# Standard install: HERMES_HOME == ~/.hermes, no profile active.
|
|
default_home = tmp_path / ".hermes"
|
|
default_home.mkdir()
|
|
self._set_home(monkeypatch, tmp_path, default_home)
|
|
|
|
assert kb.kanban_home() == default_home
|
|
assert kb.kanban_db_path() == default_home / "kanban.db"
|
|
assert kb.workspaces_root() == default_home / "kanban" / "workspaces"
|
|
assert (
|
|
kb.worker_log_path("t_demo")
|
|
== default_home / "kanban" / "logs" / "t_demo.log"
|
|
)
|
|
|
|
def test_profile_worker_resolves_to_shared_root(
|
|
self, tmp_path, monkeypatch
|
|
):
|
|
# Reproduces the bug: dispatcher uses ~/.hermes/kanban.db,
|
|
# worker spawned with -p <profile> previously resolved to
|
|
# ~/.hermes/profiles/<profile>/kanban.db. After the fix both
|
|
# converge on ~/.hermes/kanban.db.
|
|
default_home = tmp_path / ".hermes"
|
|
default_home.mkdir()
|
|
profile_home = default_home / "profiles" / "nehemiahkanban"
|
|
profile_home.mkdir(parents=True)
|
|
self._set_home(monkeypatch, tmp_path, profile_home)
|
|
|
|
# All four resolvers must anchor at the shared root, not the
|
|
# profile-local HERMES_HOME.
|
|
assert kb.kanban_home() == default_home
|
|
assert kb.kanban_db_path() == default_home / "kanban.db"
|
|
assert kb.workspaces_root() == default_home / "kanban" / "workspaces"
|
|
assert (
|
|
kb.worker_log_path("t_0d214f19")
|
|
== default_home / "kanban" / "logs" / "t_0d214f19.log"
|
|
)
|
|
|
|
# Sanity: the profile-local path that used to be returned is
|
|
# explicitly NOT what we resolve to anymore.
|
|
assert kb.kanban_db_path() != profile_home / "kanban.db"
|
|
|
|
def test_dispatcher_and_profile_worker_converge(
|
|
self, tmp_path, monkeypatch
|
|
):
|
|
# End-to-end convergence: resolve the path under each side's
|
|
# HERMES_HOME and confirm equality. This is the property the
|
|
# dispatcher/worker handoff actually depends on.
|
|
default_home = tmp_path / ".hermes"
|
|
default_home.mkdir()
|
|
profile_home = default_home / "profiles" / "coder"
|
|
profile_home.mkdir(parents=True)
|
|
|
|
# Dispatcher's perspective.
|
|
self._set_home(monkeypatch, tmp_path, default_home)
|
|
dispatcher_db = kb.kanban_db_path()
|
|
dispatcher_ws = kb.workspaces_root()
|
|
dispatcher_log = kb.worker_log_path("t_handoff")
|
|
|
|
# Worker's perspective (profile activated by `hermes -p coder`).
|
|
monkeypatch.setenv("HERMES_HOME", str(profile_home))
|
|
worker_db = kb.kanban_db_path()
|
|
worker_ws = kb.workspaces_root()
|
|
worker_log = kb.worker_log_path("t_handoff")
|
|
|
|
assert dispatcher_db == worker_db
|
|
assert dispatcher_ws == worker_ws
|
|
assert dispatcher_log == worker_log
|
|
|
|
def test_docker_custom_hermes_home_uses_env_path_directly(
|
|
self, tmp_path, monkeypatch
|
|
):
|
|
# Docker / custom deployment: HERMES_HOME points outside ~/.hermes.
|
|
# `get_default_hermes_root()` returns env_home directly when it
|
|
# is not a `<root>/profiles/<name>` shape and not under
|
|
# `Path.home() / ".hermes"`.
|
|
custom_root = tmp_path / "opt" / "hermes"
|
|
custom_root.mkdir(parents=True)
|
|
self._set_home(monkeypatch, tmp_path, custom_root)
|
|
|
|
assert kb.kanban_home() == custom_root
|
|
assert kb.kanban_db_path() == custom_root / "kanban.db"
|
|
|
|
def test_docker_profile_layout_uses_grandparent(
|
|
self, tmp_path, monkeypatch
|
|
):
|
|
# Docker profile shape: HERMES_HOME=/opt/hermes/profiles/coder;
|
|
# `get_default_hermes_root()` walks up to /opt/hermes because
|
|
# the immediate parent dir is named "profiles".
|
|
custom_root = tmp_path / "opt" / "hermes"
|
|
profile = custom_root / "profiles" / "coder"
|
|
profile.mkdir(parents=True)
|
|
self._set_home(monkeypatch, tmp_path, profile)
|
|
|
|
assert kb.kanban_home() == custom_root
|
|
assert kb.kanban_db_path() == custom_root / "kanban.db"
|
|
|
|
def test_explicit_override_via_hermes_kanban_home(
|
|
self, tmp_path, monkeypatch
|
|
):
|
|
# Explicit override: HERMES_KANBAN_HOME beats every other
|
|
# resolution rule.
|
|
default_home = tmp_path / ".hermes"
|
|
profile_home = default_home / "profiles" / "any"
|
|
profile_home.mkdir(parents=True)
|
|
override = tmp_path / "shared-board"
|
|
override.mkdir()
|
|
|
|
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
|
monkeypatch.setenv("HERMES_HOME", str(profile_home))
|
|
monkeypatch.setenv("HERMES_KANBAN_HOME", str(override))
|
|
|
|
assert kb.kanban_home() == override
|
|
assert kb.kanban_db_path() == override / "kanban.db"
|
|
assert kb.workspaces_root() == override / "kanban" / "workspaces"
|
|
|
|
def test_empty_override_falls_through(self, tmp_path, monkeypatch):
|
|
# Empty/whitespace override is treated as unset.
|
|
default_home = tmp_path / ".hermes"
|
|
default_home.mkdir()
|
|
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
|
monkeypatch.setenv("HERMES_HOME", str(default_home))
|
|
monkeypatch.setenv("HERMES_KANBAN_HOME", " ")
|
|
|
|
assert kb.kanban_home() == default_home
|
|
|
|
def test_dispatcher_and_worker_share_a_real_database(
|
|
self, tmp_path, monkeypatch
|
|
):
|
|
# Belt-and-suspenders: round-trip a task across the two
|
|
# HERMES_HOME perspectives via a real SQLite file. Without the
|
|
# fix the worker would open a different file and see no rows.
|
|
default_home = tmp_path / ".hermes"
|
|
default_home.mkdir()
|
|
profile_home = default_home / "profiles" / "nehemiahkanban"
|
|
profile_home.mkdir(parents=True)
|
|
|
|
# Dispatcher creates the board and a task.
|
|
self._set_home(monkeypatch, tmp_path, default_home)
|
|
kb.init_db()
|
|
with kb.connect() as conn:
|
|
task_id = kb.create_task(conn, title="cross-profile")
|
|
|
|
# Worker switches to the profile HERMES_HOME and reads.
|
|
monkeypatch.setenv("HERMES_HOME", str(profile_home))
|
|
with kb.connect() as conn:
|
|
task = kb.get_task(conn, task_id)
|
|
assert task is not None
|
|
assert task.title == "cross-profile"
|
|
|
|
def test_hermes_kanban_db_pin_beats_kanban_home(
|
|
self, tmp_path, monkeypatch
|
|
):
|
|
# HERMES_KANBAN_DB pins the file path directly and beats both
|
|
# HERMES_KANBAN_HOME and the `get_default_hermes_root()` path.
|
|
# This is the env the dispatcher injects into workers.
|
|
default_home = tmp_path / ".hermes"
|
|
default_home.mkdir()
|
|
umbrella = tmp_path / "umbrella"
|
|
umbrella.mkdir()
|
|
pinned_db = tmp_path / "pinned" / "board.db"
|
|
pinned_db.parent.mkdir()
|
|
|
|
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
|
monkeypatch.setenv("HERMES_HOME", str(default_home))
|
|
monkeypatch.setenv("HERMES_KANBAN_HOME", str(umbrella))
|
|
monkeypatch.setenv("HERMES_KANBAN_DB", str(pinned_db))
|
|
|
|
assert kb.kanban_db_path() == pinned_db
|
|
# workspaces_root still follows HERMES_KANBAN_HOME -- the pins
|
|
# are independent.
|
|
assert kb.workspaces_root() == umbrella / "kanban" / "workspaces"
|
|
|
|
def test_hermes_kanban_workspaces_root_pin_beats_kanban_home(
|
|
self, tmp_path, monkeypatch
|
|
):
|
|
# HERMES_KANBAN_WORKSPACES_ROOT pins the workspaces root directly.
|
|
default_home = tmp_path / ".hermes"
|
|
default_home.mkdir()
|
|
umbrella = tmp_path / "umbrella"
|
|
umbrella.mkdir()
|
|
pinned_ws = tmp_path / "pinned-workspaces"
|
|
pinned_ws.mkdir()
|
|
|
|
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
|
monkeypatch.setenv("HERMES_HOME", str(default_home))
|
|
monkeypatch.setenv("HERMES_KANBAN_HOME", str(umbrella))
|
|
monkeypatch.setenv("HERMES_KANBAN_WORKSPACES_ROOT", str(pinned_ws))
|
|
|
|
assert kb.workspaces_root() == pinned_ws
|
|
# kanban_db_path still follows HERMES_KANBAN_HOME.
|
|
assert kb.kanban_db_path() == umbrella / "kanban.db"
|
|
|
|
def test_empty_per_path_overrides_fall_through(
|
|
self, tmp_path, monkeypatch
|
|
):
|
|
# Empty/whitespace pins are treated as unset, same as
|
|
# HERMES_KANBAN_HOME.
|
|
default_home = tmp_path / ".hermes"
|
|
default_home.mkdir()
|
|
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
|
monkeypatch.setenv("HERMES_HOME", str(default_home))
|
|
monkeypatch.setenv("HERMES_KANBAN_DB", " ")
|
|
monkeypatch.setenv("HERMES_KANBAN_WORKSPACES_ROOT", "")
|
|
|
|
assert kb.kanban_db_path() == default_home / "kanban.db"
|
|
assert kb.workspaces_root() == default_home / "kanban" / "workspaces"
|
|
|
|
def test_dispatcher_spawn_injects_kanban_db_and_workspaces_root(
|
|
self, tmp_path, monkeypatch
|
|
):
|
|
# The dispatcher's `_default_spawn` must inject HERMES_KANBAN_DB
|
|
# and HERMES_KANBAN_WORKSPACES_ROOT into the worker env so the
|
|
# worker converges on the dispatcher's paths even when the
|
|
# `-p <profile>` flag rewrites HERMES_HOME.
|
|
default_home = tmp_path / ".hermes"
|
|
default_home.mkdir()
|
|
self._set_home(monkeypatch, tmp_path, default_home)
|
|
|
|
captured = {}
|
|
|
|
class _FakePopen:
|
|
def __init__(self, cmd, **kwargs):
|
|
captured["cmd"] = cmd
|
|
captured["env"] = kwargs.get("env", {})
|
|
self.pid = 4242
|
|
|
|
monkeypatch.setattr("subprocess.Popen", _FakePopen)
|
|
|
|
task = kb.Task(
|
|
id="t_dispatch_env",
|
|
title="x",
|
|
body=None,
|
|
assignee="coder",
|
|
status="ready",
|
|
priority=0,
|
|
created_by=None,
|
|
created_at=0,
|
|
started_at=None,
|
|
completed_at=None,
|
|
workspace_kind="worktree",
|
|
workspace_path=str(tmp_path / "ws"),
|
|
claim_lock=None,
|
|
claim_expires=None,
|
|
tenant=None,
|
|
branch_name="wt/t_dispatch_env",
|
|
)
|
|
kb._default_spawn(task, str(tmp_path / "ws"))
|
|
|
|
env = captured["env"]
|
|
assert env["HERMES_KANBAN_DB"] == str(default_home / "kanban.db")
|
|
assert env["HERMES_KANBAN_WORKSPACES_ROOT"] == str(
|
|
default_home / "kanban" / "workspaces"
|
|
)
|
|
assert env["HERMES_KANBAN_TASK"] == "t_dispatch_env"
|
|
assert env["HERMES_KANBAN_BRANCH"] == "wt/t_dispatch_env"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# latest_summary / latest_summaries — surface task_runs.summary handoffs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_latest_summary_returns_none_when_no_runs(kanban_home):
|
|
"""A freshly-created task has no runs and therefore no summary."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="fresh", assignee="alice")
|
|
assert kb.latest_summary(conn, t) is None
|
|
|
|
|
|
def test_latest_summary_returns_summary_after_complete(kanban_home):
|
|
"""``complete_task(summary=...)`` is the canonical kanban-worker
|
|
handoff; ``latest_summary`` must surface it so dashboards/CLI can
|
|
render what the worker actually did."""
|
|
handoff = "shipped 3 files, ran tests, opened PR #42"
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="work", assignee="alice")
|
|
kb.complete_task(conn, t, summary=handoff)
|
|
assert kb.latest_summary(conn, t) == handoff
|
|
|
|
|
|
def test_latest_summary_picks_newest_when_multiple_runs(kanban_home):
|
|
"""When a task has been re-run (block → unblock → complete), the
|
|
newest run's summary wins. We unblock to take the task back to
|
|
``ready``, then complete a second time and verify the second
|
|
summary surfaces."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="retry", assignee="alice")
|
|
kb.complete_task(conn, t, summary="first attempt")
|
|
# Move back to ready by direct SQL — block_task / unblock_task
|
|
# paths require an active claim, but we just want a second run
|
|
# row to exist with a later ended_at.
|
|
conn.execute(
|
|
"UPDATE tasks SET status='ready', completed_at=NULL WHERE id=?",
|
|
(t,),
|
|
)
|
|
# Sleep 1s so the second run's ended_at is provably later than
|
|
# the first (complete_task uses int(time.time())).
|
|
time.sleep(1.05)
|
|
kb.complete_task(conn, t, summary="second attempt — final")
|
|
assert kb.latest_summary(conn, t) == "second attempt — final"
|
|
|
|
|
|
def test_latest_summary_skips_empty_string(kanban_home):
|
|
"""A run with an empty-string summary should not mask an earlier
|
|
populated one — empty strings carry no information."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="t", assignee="alice")
|
|
kb.complete_task(conn, t, summary="real handoff")
|
|
# Inject a later run with empty summary directly. Workers
|
|
# writing "" instead of None is a real shape we want to ignore.
|
|
conn.execute(
|
|
"INSERT INTO task_runs (task_id, status, started_at, ended_at, "
|
|
"outcome, summary) VALUES (?, 'done', ?, ?, 'completed', ?)",
|
|
(t, int(time.time()) + 1, int(time.time()) + 2, ""),
|
|
)
|
|
conn.commit()
|
|
assert kb.latest_summary(conn, t) == "real handoff"
|
|
|
|
|
|
def test_latest_summaries_batch_omits_tasks_without_summary(kanban_home):
|
|
"""``latest_summaries`` is the dashboard's N+1 escape hatch — it
|
|
must return only entries for tasks that actually have a summary,
|
|
keep the per-task latest, and accept an empty input gracefully."""
|
|
with kb.connect() as conn:
|
|
t1 = kb.create_task(conn, title="a", assignee="alice")
|
|
t2 = kb.create_task(conn, title="b", assignee="bob")
|
|
t3 = kb.create_task(conn, title="c", assignee="carol")
|
|
kb.complete_task(conn, t1, summary="alpha")
|
|
kb.complete_task(conn, t3, summary="charlie")
|
|
out = kb.latest_summaries(conn, [t1, t2, t3])
|
|
assert out == {t1: "alpha", t3: "charlie"}
|
|
# Empty input → empty dict, no SQL syntax error from "IN ()".
|
|
assert kb.latest_summaries(conn, []) == {}
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# NFS / network-filesystem fallback (see hermes_state.apply_wal_with_fallback)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_connect_falls_back_to_delete_on_locking_protocol(kanban_home, caplog):
|
|
"""kanban_db.connect() must handle ``locking protocol`` on NFS/SMB.
|
|
|
|
Without this fallback, the gateway's kanban dispatcher crashes every
|
|
60s and the kanban migration (``consecutive_failures`` ADD COLUMN) is
|
|
retried forever — which is what the real-world user report shows
|
|
(see hermes-agent issue #22032).
|
|
"""
|
|
import sqlite3 as _sqlite3
|
|
from unittest.mock import patch as _patch
|
|
|
|
# Clear module cache so a fresh connect() is attempted
|
|
kb._INITIALIZED_PATHS.clear()
|
|
|
|
real_connect = _sqlite3.connect
|
|
|
|
class _WalBlockingConnection(_sqlite3.Connection):
|
|
def execute(self, sql, *args, **kwargs): # type: ignore[override]
|
|
if "journal_mode=wal" in sql.lower().replace(" ", ""):
|
|
raise _sqlite3.OperationalError("locking protocol")
|
|
return super().execute(sql, *args, **kwargs)
|
|
|
|
def wal_blocking_connect(*args, **kwargs):
|
|
return real_connect(
|
|
*args, factory=_WalBlockingConnection, **kwargs
|
|
)
|
|
|
|
with _patch("hermes_cli.kanban_db.sqlite3.connect", side_effect=wal_blocking_connect):
|
|
with caplog.at_level("WARNING", logger="hermes_state"):
|
|
conn = kb.connect()
|
|
|
|
# One fallback warning, naming kanban.db
|
|
warnings = [
|
|
r for r in caplog.records
|
|
if r.levelname == "WARNING" and "kanban.db" in r.getMessage()
|
|
]
|
|
assert len(warnings) >= 1, (
|
|
f"Expected a kanban.db WARNING, got: {[r.getMessage() for r in caplog.records]}"
|
|
)
|
|
|
|
# DB still usable end-to-end — create + list a task
|
|
t = kb.create_task(conn, title="post-fallback task")
|
|
tasks = kb.list_tasks(conn)
|
|
assert any(row.id == t for row in tasks)
|
|
conn.close()
|
|
|
|
|
|
def test_unlink_tasks_triggers_recompute_ready(kanban_home):
|
|
"""Regression test for issue #22459.
|
|
|
|
Removing a dependency via unlink_tasks must immediately promote the child
|
|
to ready when all remaining parents are done — same contract as
|
|
complete_task and unblock_task.
|
|
|
|
Before the fix, child stayed 'todo' indefinitely after unlink; only the
|
|
next dispatcher tick or a manual 'hermes kanban recompute' would promote it.
|
|
"""
|
|
with kb.connect() as conn:
|
|
# A is done.
|
|
a = kb.create_task(conn, title="parent-done")
|
|
kb.complete_task(conn, a)
|
|
|
|
# C is running (not done) — blocks child B.
|
|
c = kb.create_task(conn, title="parent-running")
|
|
kb.claim_task(conn, c, claimer="worker:1")
|
|
|
|
# B depends on both A (done) and C (running) → stays todo.
|
|
b = kb.create_task(conn, title="child", parents=[a, c])
|
|
assert kb.get_task(conn, b).status == "todo"
|
|
|
|
# Remove the blocking dependency C → B.
|
|
removed = kb.unlink_tasks(conn, c, b)
|
|
assert removed is True
|
|
|
|
# B's only remaining parent is A (done) → must be ready immediately.
|
|
assert kb.get_task(conn, b).status == "ready", (
|
|
"child should promote to ready immediately after unlink_tasks "
|
|
"removes its last blocking dependency"
|
|
)
|
|
|
|
|
|
def test_archive_task_triggers_recompute_ready_for_dependents(kanban_home):
|
|
"""Archiving a parent must immediately unblock its children.
|
|
|
|
``recompute_ready()`` already treats ``archived`` parents as satisfied
|
|
dependencies, just like ``done``. Regression: ``archive_task()`` updated
|
|
the parent row but never ran the ready-promotion pass, so children stayed
|
|
stuck in ``todo`` until a later dispatcher tick.
|
|
"""
|
|
with kb.connect() as conn:
|
|
parent = kb.create_task(conn, title="obsolete parent")
|
|
child = kb.create_task(conn, title="child", parents=[parent])
|
|
|
|
assert kb.get_task(conn, child).status == "todo"
|
|
assert kb.archive_task(conn, parent) is True
|
|
|
|
assert kb.get_task(conn, child).status == "ready", (
|
|
"child should promote to ready immediately after its last blocking "
|
|
"parent is archived"
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _add_column_if_missing / _migrate_add_optional_columns idempotency (#21708)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_add_column_if_missing_is_idempotent_on_race(kanban_home):
|
|
"""``_add_column_if_missing`` must swallow 'duplicate column name' errors.
|
|
|
|
Regression for #21708: the kanban dispatcher opens the DB twice per tick
|
|
(once via _tick_once_for_board, once via init_db's discard-and-reconnect
|
|
path). A second concurrent connection runs _migrate_add_optional_columns
|
|
before the first one commits, so ALTER TABLE raises OperationalError with
|
|
'duplicate column name: consecutive_failures'. Without the idempotency
|
|
guard that crashes the dispatcher on the first tick after every restart.
|
|
"""
|
|
import sqlite3
|
|
|
|
conn = sqlite3.connect(":memory:")
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute(
|
|
"CREATE TABLE tasks (id INTEGER PRIMARY KEY, title TEXT NOT NULL)"
|
|
)
|
|
|
|
# First call adds the column — returns True.
|
|
added = kb._add_column_if_missing(conn, "tasks", "extra_col", "extra_col TEXT")
|
|
assert added is True
|
|
cols = {row["name"] for row in conn.execute("PRAGMA table_info(tasks)")}
|
|
assert "extra_col" in cols
|
|
|
|
# Second call on same connection — column already exists — must return
|
|
# False without raising, simulating the race the dispatcher hits.
|
|
added_again = kb._add_column_if_missing(
|
|
conn, "tasks", "extra_col", "extra_col TEXT"
|
|
)
|
|
assert added_again is False
|
|
|
|
conn.close()
|
|
|
|
|
|
def test_migrate_add_optional_columns_tolerates_concurrent_migration(kanban_home):
|
|
"""Full _migrate_add_optional_columns must not raise when columns already
|
|
exist (issue #21708 race window — two connections migrate concurrently)."""
|
|
import sqlite3
|
|
|
|
# Schema already in fully-migrated state (all optional columns present).
|
|
conn = sqlite3.connect(":memory:")
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE tasks (
|
|
id INTEGER PRIMARY KEY,
|
|
title TEXT NOT NULL,
|
|
tenant TEXT,
|
|
result TEXT,
|
|
idempotency_key TEXT,
|
|
branch_name TEXT,
|
|
consecutive_failures INTEGER NOT NULL DEFAULT 0,
|
|
worker_pid INTEGER,
|
|
last_failure_error TEXT,
|
|
max_runtime_seconds INTEGER,
|
|
last_heartbeat_at INTEGER,
|
|
current_run_id INTEGER,
|
|
workflow_template_id TEXT,
|
|
current_step_key TEXT,
|
|
skills TEXT,
|
|
max_retries INTEGER,
|
|
session_id TEXT
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE task_events (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
task_id TEXT NOT NULL DEFAULT '',
|
|
run_id INTEGER,
|
|
kind TEXT NOT NULL DEFAULT '',
|
|
payload TEXT,
|
|
created_at INTEGER NOT NULL DEFAULT 0
|
|
)
|
|
"""
|
|
)
|
|
|
|
# Running migration on an already-migrated schema must not raise.
|
|
kb._migrate_add_optional_columns(conn)
|
|
conn.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Dispatcher spawn invocation — _resolve_hermes_argv()
|
|
#
|
|
# Workers spawned by the dispatcher must use a `hermes` invocation that does
|
|
# not depend on PATH being set up correctly. cron jobs, systemd User= services,
|
|
# launchd jobs, and other detached processes routinely run with a stripped
|
|
# $PATH that doesn't include the venv's bin/, so a bare `["hermes", ...]`
|
|
# spawn fails with FileNotFoundError and the task gets stuck. The resolver
|
|
# prefers the PATH shim (familiar `ps` output) but falls back to the module
|
|
# form so the spawn keeps working when PATH is missing the shim.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_resolve_hermes_argv_prefers_path_shim(monkeypatch):
|
|
"""When `hermes` is on PATH, use the shim — preserves familiar ps output."""
|
|
import shutil
|
|
import hermes_cli.kanban_db as kb
|
|
|
|
monkeypatch.delenv("HERMES_BIN", raising=False)
|
|
monkeypatch.setattr(shutil, "which", lambda name: "/usr/local/bin/hermes")
|
|
argv = kb._resolve_hermes_argv()
|
|
assert argv == ["/usr/local/bin/hermes"]
|
|
|
|
|
|
def test_resolve_hermes_argv_absolutizes_relative_exe_shim(monkeypatch, tmp_path):
|
|
"""A relative executable override must not remain workspace-cwd-dependent."""
|
|
import hermes_cli.kanban_db as kb
|
|
|
|
monkeypatch.chdir(tmp_path)
|
|
monkeypatch.setenv("HERMES_BIN", ".\\hermes.exe")
|
|
monkeypatch.setattr(kb, "_IS_WINDOWS", True)
|
|
|
|
assert kb._resolve_hermes_argv() == [os.path.abspath(".\\hermes.exe")]
|
|
|
|
|
|
def test_resolve_hermes_argv_avoids_implicit_windows_batch_shim(monkeypatch, tmp_path):
|
|
"""Implicit .cmd/.bat shims use the module fallback, not batch argv[0]."""
|
|
import sys
|
|
import hermes_cli.kanban_db as kb
|
|
|
|
bin_dir = tmp_path / "bin"
|
|
bin_dir.mkdir()
|
|
(bin_dir / "hermes.CMD").write_text("@echo off\n", encoding="utf-8")
|
|
monkeypatch.delenv("HERMES_BIN", raising=False)
|
|
monkeypatch.setenv("PATH", str(bin_dir))
|
|
monkeypatch.setenv("PATHEXT", ".CMD")
|
|
monkeypatch.setattr(kb, "_IS_WINDOWS", True)
|
|
|
|
assert kb._resolve_hermes_argv() == [sys.executable, "-m", "hermes_cli.main"]
|
|
|
|
|
|
def test_resolve_hermes_argv_honors_hermes_bin_path_override(monkeypatch, tmp_path):
|
|
"""An explicit path-like HERMES_BIN lets service managers pin the executable."""
|
|
import shutil
|
|
import hermes_cli.kanban_db as kb
|
|
|
|
shim = tmp_path / "bin" / "hermes"
|
|
shim.parent.mkdir()
|
|
shim.write_text("#!/bin/sh\n", encoding="utf-8")
|
|
monkeypatch.setenv("HERMES_BIN", str(shim))
|
|
monkeypatch.setattr(shutil, "which", lambda name: None)
|
|
|
|
assert kb._resolve_hermes_argv() == [str(shim)]
|
|
|
|
|
|
def test_resolve_hermes_argv_hermes_bin_bare_name_uses_path(monkeypatch, tmp_path):
|
|
"""Bare HERMES_BIN values keep PATH semantics instead of cwd shadowing."""
|
|
import stat
|
|
import hermes_cli.kanban_db as kb
|
|
|
|
cwd_hermes = tmp_path / "hermes"
|
|
cwd_hermes.write_text("wrong\n", encoding="utf-8")
|
|
cwd_hermes.chmod(cwd_hermes.stat().st_mode | stat.S_IXUSR)
|
|
path_hermes = tmp_path / "bin" / "hermes"
|
|
path_hermes.parent.mkdir()
|
|
path_hermes.write_text("right\n", encoding="utf-8")
|
|
path_hermes.chmod(path_hermes.stat().st_mode | stat.S_IXUSR)
|
|
monkeypatch.chdir(tmp_path)
|
|
monkeypatch.setenv("PATH", str(path_hermes.parent))
|
|
monkeypatch.setenv("HERMES_BIN", "hermes")
|
|
|
|
assert kb._resolve_hermes_argv() == [str(path_hermes)]
|
|
|
|
|
|
def test_resolve_hermes_argv_hermes_bin_bare_name_ignores_cwd(monkeypatch, tmp_path):
|
|
"""Bare HERMES_BIN does not accept current-directory shadow executables."""
|
|
import sys
|
|
import hermes_cli.kanban_db as kb
|
|
|
|
(tmp_path / "hermes.exe").write_text("wrong\n", encoding="utf-8")
|
|
monkeypatch.chdir(tmp_path)
|
|
monkeypatch.setenv("PATH", "")
|
|
monkeypatch.setenv("HERMES_BIN", "hermes")
|
|
monkeypatch.setattr(kb, "_IS_WINDOWS", True)
|
|
|
|
assert kb._resolve_hermes_argv() == [sys.executable, "-m", "hermes_cli.main"]
|
|
|
|
|
|
def test_resolve_hermes_argv_hermes_bin_bare_cmd_uses_module_fallback(monkeypatch, tmp_path):
|
|
"""A PATH-resolved HERMES_BIN batch shim is not used as worker argv[0]."""
|
|
import sys
|
|
import hermes_cli.kanban_db as kb
|
|
|
|
bin_dir = tmp_path / "bin"
|
|
bin_dir.mkdir()
|
|
(bin_dir / "hermes.CMD").write_text("@echo off\n", encoding="utf-8")
|
|
monkeypatch.setenv("PATH", str(bin_dir))
|
|
monkeypatch.setenv("PATHEXT", ".CMD")
|
|
monkeypatch.setenv("HERMES_BIN", "hermes")
|
|
monkeypatch.setattr(kb, "_IS_WINDOWS", True)
|
|
|
|
assert kb._resolve_hermes_argv() == [sys.executable, "-m", "hermes_cli.main"]
|
|
|
|
|
|
def test_resolve_hermes_argv_hermes_bin_unresolved_bare_name_falls_back(monkeypatch):
|
|
"""Unresolved HERMES_BIN command names do not delegate cwd search to Popen."""
|
|
import sys
|
|
import hermes_cli.kanban_db as kb
|
|
|
|
monkeypatch.setenv("PATH", "")
|
|
monkeypatch.setenv("HERMES_BIN", "hermes")
|
|
|
|
assert kb._resolve_hermes_argv() == [sys.executable, "-m", "hermes_cli.main"]
|
|
|
|
|
|
def test_resolve_hermes_argv_falls_back_to_module_form_when_no_path_shim(monkeypatch):
|
|
"""When the shim is not on PATH, fall back to `python -m hermes_cli.main`.
|
|
|
|
Pins the correct module name (NOT `hermes` — there is no top-level
|
|
`hermes` package). Regression for #23198: the original PR shipped
|
|
`python -m hermes` which fails with `No module named hermes` on every
|
|
invocation.
|
|
"""
|
|
import shutil
|
|
import sys
|
|
import hermes_cli.kanban_db as kb
|
|
|
|
monkeypatch.delenv("HERMES_BIN", raising=False)
|
|
monkeypatch.setattr(shutil, "which", lambda name: None)
|
|
argv = kb._resolve_hermes_argv()
|
|
assert argv == [sys.executable, "-m", "hermes_cli.main"]
|
|
|
|
|
|
def test_resolve_hermes_argv_module_actually_runs():
|
|
"""The fallback module name must be importable + runnable.
|
|
|
|
A unit test that pins the literal string is necessary but not
|
|
sufficient — if `hermes_cli.main` ever loses `if __name__ == "__main__"`
|
|
handling or its argparse setup, `python -m hermes_cli.main --version`
|
|
would fail and so would every dispatcher spawn that hits the fallback.
|
|
Run it as a real subprocess to catch that regression.
|
|
"""
|
|
import subprocess
|
|
import sys
|
|
import hermes_cli.kanban_db as kb
|
|
import shutil
|
|
import unittest.mock as mock
|
|
|
|
with mock.patch.dict(os.environ, {}, clear=False):
|
|
os.environ.pop("HERMES_BIN", None)
|
|
with mock.patch.object(shutil, "which", return_value=None):
|
|
argv = kb._resolve_hermes_argv()
|
|
r = subprocess.run(argv + ["--version"], capture_output=True, text=True, timeout=30)
|
|
assert r.returncode == 0, (
|
|
f"`{' '.join(argv)} --version` failed (rc={r.returncode}); "
|
|
f"stderr={r.stderr[:200]!r}"
|
|
)
|
|
assert "Hermes Agent" in r.stdout, f"unexpected output: {r.stdout[:200]!r}"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# task_age — guard against corrupt timestamp values
|
|
#
|
|
# The Task dataclass declares ``created_at: int`` but rows come from sqlite
|
|
# without coercion at the boundary. A row that ever held a non-int (e.g. an
|
|
# unsubstituted ``'%s'`` from a logged format string, ``None``, an arbitrary
|
|
# string, or a float-as-string) used to crash ``task_age`` with ``ValueError``
|
|
# and turn ``GET /api/plugins/kanban/board`` into a 500 because the dashboard
|
|
# calls ``task_age`` unguarded for every task in the response.
|
|
#
|
|
# After the fix, ``_safe_int`` returns ``None`` on bad input and ``task_age``
|
|
# degrades gracefully (per-field ``None`` rather than a hard crash).
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_task(**overrides) -> "kb.Task":
|
|
"""Minimal Task with all required fields filled in. Override anything."""
|
|
defaults = dict(
|
|
id="t_age",
|
|
title="x",
|
|
body=None,
|
|
assignee=None,
|
|
status="ready",
|
|
priority=0,
|
|
created_by=None,
|
|
created_at=0,
|
|
started_at=None,
|
|
completed_at=None,
|
|
workspace_kind="scratch",
|
|
workspace_path=None,
|
|
claim_lock=None,
|
|
claim_expires=None,
|
|
tenant=None,
|
|
)
|
|
defaults.update(overrides)
|
|
return kb.Task(**defaults)
|
|
|
|
|
|
def test_safe_int_accepts_int_and_int_string():
|
|
"""Sanity: well-typed values pass through."""
|
|
# PR d8ad431de renamed _safe_int → _to_epoch (now also handles ISO-8601).
|
|
assert kb._to_epoch(0) == 0
|
|
assert kb._to_epoch(1700000000) == 1700000000
|
|
assert kb._to_epoch("1700000000") == 1700000000
|
|
|
|
|
|
def test_safe_int_returns_none_on_corrupt_inputs():
|
|
"""All the failure modes that used to crash task_age."""
|
|
# None — common when the column was never written
|
|
assert kb._to_epoch(None) is None
|
|
# Unsubstituted format string — the literal case the PR title cites
|
|
assert kb._to_epoch("%s") is None
|
|
# Arbitrary non-numeric strings
|
|
assert kb._to_epoch("abc") is None
|
|
assert kb._to_epoch("") is None
|
|
# Float-ish strings: int("1.5") raises ValueError too — caller wants None.
|
|
assert kb._to_epoch("1.5") is None
|
|
# Random object — covered by TypeError branch
|
|
assert kb._to_epoch(object()) is None
|
|
|
|
|
|
def test_task_age_handles_corrupt_created_at():
|
|
"""Pre-fix this raised ValueError and 500'd /api/plugins/kanban/board."""
|
|
t = _make_task(created_at="%s")
|
|
age = kb.task_age(t)
|
|
assert age["created_age_seconds"] is None
|
|
assert age["started_age_seconds"] is None
|
|
assert age["time_to_complete_seconds"] is None
|
|
|
|
|
|
def test_task_age_handles_corrupt_started_and_completed():
|
|
"""All three timestamp fields share the same _safe_int treatment."""
|
|
t = _make_task(
|
|
created_at=1700000000,
|
|
started_at="garbage",
|
|
completed_at=None,
|
|
)
|
|
age = kb.task_age(t)
|
|
assert isinstance(age["created_age_seconds"], int)
|
|
assert age["started_age_seconds"] is None
|
|
assert age["time_to_complete_seconds"] is None
|
|
|
|
|
|
def test_task_age_well_formed_task():
|
|
"""Regression: the safe-int path must not change behavior for normal data."""
|
|
import time
|
|
now = int(time.time())
|
|
t = _make_task(
|
|
created_at=now - 60,
|
|
started_at=now - 30,
|
|
completed_at=now,
|
|
)
|
|
age = kb.task_age(t)
|
|
assert 55 <= age["created_age_seconds"] <= 65
|
|
assert 25 <= age["started_age_seconds"] <= 35
|
|
assert 25 <= age["time_to_complete_seconds"] <= 35
|
|
|
|
|
|
def test_task_dict_survives_corrupt_created_at(tmp_path, monkeypatch):
|
|
"""Defense in depth: even if task_age ever raised, plugin_api must not 500.
|
|
|
|
The PR also added a try/except around the task_age call in
|
|
`plugins/kanban/dashboard/plugin_api.py::_task_dict`. Verify a single
|
|
corrupt row doesn't turn the whole board response into an error.
|
|
"""
|
|
# Set up an isolated kanban home so we can write a corrupt created_at.
|
|
home = tmp_path / ".hermes"
|
|
home.mkdir()
|
|
monkeypatch.setenv("HERMES_HOME", str(home))
|
|
monkeypatch.setattr("pathlib.Path.home", lambda: tmp_path)
|
|
kb._INITIALIZED_PATHS.clear()
|
|
kb.init_db()
|
|
|
|
# Insert a row with a non-int created_at (simulates the historical
|
|
# bug that produced corrupt rows).
|
|
conn = kb.connect()
|
|
try:
|
|
good_id = kb.create_task(conn, title="good")
|
|
# Now write a row with corrupt created_at directly.
|
|
conn.execute(
|
|
"UPDATE tasks SET created_at = ? WHERE id = ?",
|
|
("%s", good_id),
|
|
)
|
|
finally:
|
|
conn.close()
|
|
|
|
# Re-read and pass through task_age — must not raise.
|
|
conn = kb.connect()
|
|
try:
|
|
task = kb.get_task(conn, good_id)
|
|
finally:
|
|
conn.close()
|
|
age = kb.task_age(task)
|
|
assert age["created_age_seconds"] is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Board-level default_workdir
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_create_task_without_workspace_inherits_board_default_workdir(kanban_home, monkeypatch):
|
|
"""Board with default_workdir → create_task without workspace_path → inherits default."""
|
|
default_wd = "/home/user/project"
|
|
kb.create_board("work-proj", default_workdir=default_wd)
|
|
|
|
with kb.connect(board="work-proj") as conn:
|
|
tid = kb.create_task(conn, title="inherited", board="work-proj")
|
|
t = kb.get_task(conn, tid)
|
|
assert t is not None
|
|
assert t.workspace_path == default_wd
|
|
|
|
|
|
def test_create_task_without_workspace_no_default_stays_none(kanban_home):
|
|
"""Board without default_workdir → create_task without workspace_path → stays None."""
|
|
kb.create_board("empty-board")
|
|
|
|
with kb.connect(board="empty-board") as conn:
|
|
tid = kb.create_task(conn, title="none", board="empty-board")
|
|
t = kb.get_task(conn, tid)
|
|
assert t is not None
|
|
assert t.workspace_path is None
|
|
|
|
|
|
def test_create_task_with_explicit_workspace_ignores_board_default(kanban_home):
|
|
"""create_task with explicit workspace_path → ignores board default."""
|
|
kb.create_board("custom-ws-board", default_workdir="/board/default")
|
|
|
|
explicit = "/my/explicit/path"
|
|
with kb.connect(board="custom-ws-board") as conn:
|
|
tid = kb.create_task(conn, title="explicit", workspace_path=explicit, board="custom-ws-board")
|
|
t = kb.get_task(conn, tid)
|
|
assert t is not None
|
|
assert t.workspace_path == explicit
|
|
assert t.workspace_path != "/board/default"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# dispatch_once — max_in_progress
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_dispatch_max_in_progress_skips_when_at_limit(kanban_home, all_assignees_spawnable):
|
|
"""When max_in_progress=N and N tasks are already running, spawn nothing."""
|
|
spawns = []
|
|
|
|
def fake_spawn(task, workspace):
|
|
spawns.append(task.id)
|
|
|
|
with kb.connect() as conn:
|
|
# Two running tasks.
|
|
t1 = kb.create_task(conn, title="a", assignee="alice")
|
|
t2 = kb.create_task(conn, title="b", assignee="bob")
|
|
kb.claim_task(conn, t1)
|
|
kb.claim_task(conn, t2)
|
|
# Two more ready to spawn — but cap is 2 so none should fire.
|
|
kb.create_task(conn, title="c", assignee="bob")
|
|
kb.create_task(conn, title="d", assignee="alice")
|
|
kb.dispatch_once(conn, spawn_fn=fake_spawn, max_in_progress=2)
|
|
|
|
assert len(spawns) == 0, f"expected 0 spawns, got {len(spawns)}"
|
|
|
|
|
|
def test_dispatch_max_in_progress_spawns_up_to_cap(kanban_home, all_assignees_spawnable):
|
|
"""When max_in_progress=3 and only 1 is running, spawn up to 2 more."""
|
|
spawns = []
|
|
|
|
def fake_spawn(task, workspace):
|
|
spawns.append(task.id)
|
|
|
|
with kb.connect() as conn:
|
|
# One running task.
|
|
t1 = kb.create_task(conn, title="a", assignee="alice")
|
|
kb.claim_task(conn, t1)
|
|
# Three ready tasks — only the first 2 should be spawned.
|
|
kb.create_task(conn, title="b", assignee="bob")
|
|
kb.create_task(conn, title="c", assignee="bob")
|
|
kb.create_task(conn, title="d", assignee="bob")
|
|
kb.dispatch_once(conn, spawn_fn=fake_spawn, max_in_progress=3)
|
|
|
|
assert len(spawns) == 2, f"expected 2 spawns (cap 3 - 1 running), got {len(spawns)}"
|
|
|
|
|
|
def test_dispatch_max_in_progress_none_is_unlimited(kanban_home, all_assignees_spawnable):
|
|
"""Default None means no limit — all ready tasks are spawned."""
|
|
spawns = []
|
|
|
|
def fake_spawn(task, workspace):
|
|
spawns.append(task.id)
|
|
|
|
with kb.connect() as conn:
|
|
for title in ["a", "b", "c", "d"]:
|
|
kb.create_task(conn, title=title, assignee="alice")
|
|
kb.dispatch_once(conn, spawn_fn=fake_spawn, max_in_progress=None)
|
|
|
|
assert len(spawns) == 4, f"expected 4 spawns (unlimited), got {len(spawns)}"
|
|
|
|
# Review column dispatch
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _set_task_status(conn: sqlite3.Connection, task_id: str, status: str) -> None:
|
|
"""Test helper: set a task's status directly."""
|
|
conn.execute("UPDATE tasks SET status = ? WHERE id = ?", (status, task_id))
|
|
|
|
|
|
def test_claim_review_task_transitions_to_running(kanban_home):
|
|
"""claim_review_task atomically transitions review -> running."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="review me", assignee="alice")
|
|
_set_task_status(conn, t, "review")
|
|
claimed = kb.claim_review_task(conn, t)
|
|
assert claimed is not None
|
|
assert claimed.status == "running"
|
|
assert claimed.claim_lock is not None
|
|
|
|
|
|
def test_claim_review_task_fails_on_non_review(kanban_home):
|
|
"""claim_review_task returns None if task is not in review status."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="ready task", assignee="alice")
|
|
# Task is in 'ready', not 'review'
|
|
claimed = kb.claim_review_task(conn, t)
|
|
assert claimed is None
|
|
|
|
|
|
def test_claim_review_task_fails_when_already_claimed(kanban_home):
|
|
"""claim_review_task returns None if the task was already claimed."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="review me", assignee="alice")
|
|
_set_task_status(conn, t, "review")
|
|
first = kb.claim_review_task(conn, t)
|
|
assert first is not None
|
|
second = kb.claim_review_task(conn, t)
|
|
assert second is None
|
|
|
|
|
|
def test_dispatch_review_dry_run(kanban_home, all_assignees_spawnable):
|
|
"""dispatch_once dry-run sees review tasks and reports them as spawned."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="review me", assignee="alice")
|
|
_set_task_status(conn, t, "review")
|
|
res = kb.dispatch_once(conn, dry_run=True)
|
|
assert len(res.spawned) == 1
|
|
assert res.spawned[0][0] == t
|
|
# Dry run must NOT mutate status.
|
|
with kb.connect() as conn:
|
|
assert kb.get_task(conn, t).status == "review"
|
|
|
|
|
|
def test_dispatch_review_spawns_with_correct_skills(
|
|
kanban_home, all_assignees_spawnable,
|
|
):
|
|
"""Review tasks get sdlc-review skill set before spawning."""
|
|
spawned_tasks = []
|
|
|
|
def capture_spawn(task, workspace, board=None):
|
|
spawned_tasks.append(task)
|
|
return 42 # fake PID
|
|
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="review me", assignee="alice")
|
|
_set_task_status(conn, t, "review")
|
|
res = kb.dispatch_once(conn, spawn_fn=capture_spawn)
|
|
assert len(res.spawned) == 1
|
|
assert len(spawned_tasks) == 1
|
|
assert spawned_tasks[0].skills == ["sdlc-review"]
|
|
|
|
|
|
def test_dispatch_review_skips_unassigned(kanban_home):
|
|
"""Unassigned review tasks go to skipped_unassigned, not spawned."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="review floater")
|
|
_set_task_status(conn, t, "review")
|
|
res = kb.dispatch_once(conn, dry_run=True)
|
|
assert t in res.skipped_unassigned
|
|
assert not res.spawned
|
|
|
|
|
|
def test_dispatch_review_counts_toward_max_spawn(
|
|
kanban_home, all_assignees_spawnable,
|
|
):
|
|
"""Review spawns count against max_spawn alongside ready tasks."""
|
|
spawns = []
|
|
|
|
def fake_spawn(task, workspace, board=None):
|
|
spawns.append(task.id)
|
|
return 42
|
|
|
|
with kb.connect() as conn:
|
|
# Create 2 ready tasks + 1 review task, max_spawn=2
|
|
t1 = kb.create_task(conn, title="ready 1", assignee="alice")
|
|
t2 = kb.create_task(conn, title="ready 2", assignee="bob")
|
|
t3 = kb.create_task(conn, title="review", assignee="alice")
|
|
_set_task_status(conn, t3, "review")
|
|
res = kb.dispatch_once(conn, spawn_fn=fake_spawn, max_spawn=2)
|
|
# Only 2 should spawn (ready tasks get priority in the loop)
|
|
assert len(res.spawned) == 2
|
|
assert len(spawns) == 2
|
|
|
|
|
|
def test_dispatch_review_spawns_when_ready_empty(
|
|
kanban_home, all_assignees_spawnable,
|
|
):
|
|
"""When only review tasks exist, they still get dispatched."""
|
|
spawns = []
|
|
|
|
def fake_spawn(task, workspace, board=None):
|
|
spawns.append(task.id)
|
|
return 42
|
|
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="review me", assignee="alice")
|
|
_set_task_status(conn, t, "review")
|
|
res = kb.dispatch_once(conn, spawn_fn=fake_spawn)
|
|
assert len(res.spawned) == 1
|
|
assert spawns[0] == t
|
|
|
|
|
|
def test_has_spawnable_review_true(kanban_home):
|
|
"""has_spawnable_review returns True when review tasks exist with real profiles."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="review me", assignee="default")
|
|
_set_task_status(conn, t, "review")
|
|
# default profile should exist in the test env
|
|
assert kb.has_spawnable_review(conn) is True
|
|
|
|
|
|
def test_has_spawnable_review_false_on_empty(kanban_home):
|
|
"""has_spawnable_review returns False when no review tasks exist."""
|
|
with kb.connect() as conn:
|
|
assert kb.has_spawnable_review(conn) is False
|
|
|
|
|
|
def test_has_spawnable_review_false_when_only_terminal_lanes(
|
|
kanban_home, monkeypatch,
|
|
):
|
|
"""has_spawnable_review returns False when review tasks are terminal lanes."""
|
|
from hermes_cli import profiles
|
|
monkeypatch.setattr(profiles, "profile_exists", lambda name: False)
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="review", assignee="orion-cc")
|
|
_set_task_status(conn, t, "review")
|
|
assert kb.has_spawnable_review(conn) is False
|
|
|
|
|
|
def test_dispatch_review_skips_nonspawnable(kanban_home, monkeypatch):
|
|
"""Review tasks with non-existent profiles go to skipped_nonspawnable."""
|
|
from hermes_cli import profiles
|
|
monkeypatch.setattr(profiles, "profile_exists", lambda name: False)
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="review", assignee="orion-cc")
|
|
_set_task_status(conn, t, "review")
|
|
res = kb.dispatch_once(conn, dry_run=True)
|
|
assert t in res.skipped_nonspawnable
|
|
assert not res.spawned
|
|
|
|
|
|
def test_review_status_in_valid_statuses():
|
|
"""'review' is a valid task status."""
|
|
assert "review" in kb.VALID_STATUSES
|
|
|
|
|
|
def test_dispatch_review_does_not_claim_ready_tasks(
|
|
kanban_home, all_assignees_spawnable,
|
|
):
|
|
"""Review dispatch uses claim_review_task, which only claims review tasks."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="ready task", assignee="alice")
|
|
# claim_review_task should NOT claim a ready task
|
|
claimed = kb.claim_review_task(conn, t)
|
|
assert claimed is None
|
|
|
|
# Stale detection — detect_stale_running
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_detect_stale_returns_running_task_with_no_heartbeat(kanban_home, monkeypatch):
|
|
"""A task running > timeout with zero heartbeats gets reclaimed as stale."""
|
|
import hermes_cli.kanban_db as _kb
|
|
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="stale-no-hb", assignee="worker")
|
|
kb.claim_task(conn, t)
|
|
kb._set_worker_pid(conn, t, os.getpid())
|
|
|
|
# Rewind started_at so the task appears to have been running for 5 hours.
|
|
five_hours_ago = int(time.time()) - (5 * 3600)
|
|
with kb.write_txn(conn):
|
|
conn.execute(
|
|
"UPDATE tasks SET started_at = ? WHERE id = ?", (five_hours_ago, t)
|
|
)
|
|
conn.execute(
|
|
"UPDATE task_runs SET started_at = ? "
|
|
"WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)",
|
|
(five_hours_ago, t),
|
|
)
|
|
# No heartbeat set — last_heartbeat_at stays NULL.
|
|
|
|
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
|
|
killed = []
|
|
stale = kb.detect_stale_running(
|
|
conn, stale_timeout_seconds=14400, signal_fn=lambda p, s: killed.append(s),
|
|
)
|
|
assert t in stale, "Task with no heartbeat for >4h should be reclaimed"
|
|
task = kb.get_task(conn, t)
|
|
assert task.status == "ready"
|
|
|
|
|
|
def test_detect_stale_returns_task_with_stale_heartbeat(kanban_home, monkeypatch):
|
|
"""A task running > timeout with a heartbeat older than 1h gets reclaimed."""
|
|
import hermes_cli.kanban_db as _kb
|
|
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="stale-hb", assignee="worker")
|
|
kb.claim_task(conn, t)
|
|
kb._set_worker_pid(conn, t, os.getpid())
|
|
|
|
five_hours_ago = int(time.time()) - (5 * 3600)
|
|
heartbeat_2h_ago = int(time.time()) - (2 * 3600)
|
|
with kb.write_txn(conn):
|
|
conn.execute(
|
|
"UPDATE tasks SET started_at = ?, last_heartbeat_at = ? "
|
|
"WHERE id = ?",
|
|
(five_hours_ago, heartbeat_2h_ago, t),
|
|
)
|
|
conn.execute(
|
|
"UPDATE task_runs SET started_at = ? "
|
|
"WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)",
|
|
(five_hours_ago, t),
|
|
)
|
|
|
|
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
|
|
stale = kb.detect_stale_running(
|
|
conn, stale_timeout_seconds=14400, signal_fn=lambda p, s: None,
|
|
)
|
|
assert t in stale, (
|
|
"Task with heartbeat >1h old and started >4h ago should be stale"
|
|
)
|
|
assert kb.get_task(conn, t).status == "ready"
|
|
|
|
|
|
def test_detect_stale_skips_task_with_recent_heartbeat(kanban_home, monkeypatch):
|
|
"""A task running > timeout but with a recent heartbeat is NOT reclaimed."""
|
|
import hermes_cli.kanban_db as _kb
|
|
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="alive-hb", assignee="worker")
|
|
kb.claim_task(conn, t)
|
|
kb._set_worker_pid(conn, t, os.getpid())
|
|
|
|
five_hours_ago = int(time.time()) - (5 * 3600)
|
|
heartbeat_now = int(time.time()) # heartbeat just happened
|
|
with kb.write_txn(conn):
|
|
conn.execute(
|
|
"UPDATE tasks SET started_at = ?, last_heartbeat_at = ? "
|
|
"WHERE id = ?",
|
|
(five_hours_ago, heartbeat_now, t),
|
|
)
|
|
conn.execute(
|
|
"UPDATE task_runs SET started_at = ? "
|
|
"WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)",
|
|
(five_hours_ago, t),
|
|
)
|
|
|
|
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: True)
|
|
stale = kb.detect_stale_running(
|
|
conn, stale_timeout_seconds=14400, signal_fn=lambda p, s: None,
|
|
)
|
|
assert stale == [], "Task with recent heartbeat should not be reclaimed"
|
|
assert kb.get_task(conn, t).status == "running"
|
|
|
|
|
|
def test_detect_stale_skips_recently_started_task(kanban_home, monkeypatch):
|
|
"""A task started < timeout ago is NOT reclaimed even with no heartbeat."""
|
|
import hermes_cli.kanban_db as _kb
|
|
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="fresh", assignee="worker")
|
|
kb.claim_task(conn, t)
|
|
kb._set_worker_pid(conn, t, os.getpid())
|
|
|
|
# Started only 1 hour ago — well within the 4h threshold.
|
|
one_hour_ago = int(time.time()) - 3600
|
|
with kb.write_txn(conn):
|
|
conn.execute(
|
|
"UPDATE tasks SET started_at = ? WHERE id = ?", (one_hour_ago, t)
|
|
)
|
|
conn.execute(
|
|
"UPDATE task_runs SET started_at = ? "
|
|
"WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)",
|
|
(one_hour_ago, t),
|
|
)
|
|
|
|
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: True)
|
|
stale = kb.detect_stale_running(
|
|
conn, stale_timeout_seconds=14400, signal_fn=lambda p, s: None,
|
|
)
|
|
assert stale == [], "Task started <4h ago should not be reclaimed"
|
|
assert kb.get_task(conn, t).status == "running"
|
|
|
|
|
|
def test_detect_stale_skips_when_timeout_zero(kanban_home, monkeypatch):
|
|
"""stale_timeout_seconds=0 disables stale detection entirely."""
|
|
import hermes_cli.kanban_db as _kb
|
|
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="disabled", assignee="worker")
|
|
kb.claim_task(conn, t)
|
|
kb._set_worker_pid(conn, t, os.getpid())
|
|
|
|
five_hours_ago = int(time.time()) - (5 * 3600)
|
|
with kb.write_txn(conn):
|
|
conn.execute(
|
|
"UPDATE tasks SET started_at = ? WHERE id = ?", (five_hours_ago, t)
|
|
)
|
|
conn.execute(
|
|
"UPDATE task_runs SET started_at = ? "
|
|
"WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)",
|
|
(five_hours_ago, t),
|
|
)
|
|
|
|
stale = kb.detect_stale_running(
|
|
conn, stale_timeout_seconds=0, signal_fn=lambda p, s: None,
|
|
)
|
|
assert stale == [], "timeout=0 should disable stale detection"
|
|
assert kb.get_task(conn, t).status == "running"
|
|
|
|
|
|
def test_detect_stale_skips_blocked_tasks(kanban_home, monkeypatch):
|
|
"""Blocked tasks are NOT reclaimed by stale detection."""
|
|
import hermes_cli.kanban_db as _kb
|
|
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="blocked-task", assignee="worker")
|
|
kb.claim_task(conn, t)
|
|
kb._set_worker_pid(conn, t, os.getpid())
|
|
|
|
five_hours_ago = int(time.time()) - (5 * 3600)
|
|
with kb.write_txn(conn):
|
|
conn.execute(
|
|
"UPDATE tasks SET started_at = ? WHERE id = ?", (five_hours_ago, t)
|
|
)
|
|
conn.execute(
|
|
"UPDATE task_runs SET started_at = ? "
|
|
"WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)",
|
|
(five_hours_ago, t),
|
|
)
|
|
# Block the task explicitly.
|
|
kb.block_task(conn, t, reason="human requested block")
|
|
|
|
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
|
|
stale = kb.detect_stale_running(
|
|
conn, stale_timeout_seconds=14400, signal_fn=lambda p, s: None,
|
|
)
|
|
assert stale == [], "Blocked task should not be reclaimed by stale detection"
|
|
assert kb.get_task(conn, t).status == "blocked"
|
|
|
|
|
|
def test_detect_stale_does_not_tick_failure_counter(kanban_home, monkeypatch):
|
|
"""Stale reclaim must NOT tick consecutive_failures.
|
|
|
|
Stale detection is dispatcher-side absence-of-heartbeat detection,
|
|
not a worker failure. Counting it as a failure would let two
|
|
legitimately-long-running tasks (>4h without explicit heartbeat) trip
|
|
the circuit breaker and auto-block at the default failure_limit=2,
|
|
even though no worker actually failed. The 'stale' event in
|
|
task_events is the right audit surface; the consecutive_failures
|
|
counter is reserved for spawn_failed / timed_out / crashed.
|
|
"""
|
|
import hermes_cli.kanban_db as _kb
|
|
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="stale-no-counter-tick", assignee="worker")
|
|
kb.claim_task(conn, t)
|
|
kb._set_worker_pid(conn, t, os.getpid())
|
|
|
|
five_hours_ago = int(time.time()) - (5 * 3600)
|
|
with kb.write_txn(conn):
|
|
conn.execute(
|
|
"UPDATE tasks SET started_at = ? WHERE id = ?", (five_hours_ago, t)
|
|
)
|
|
conn.execute(
|
|
"UPDATE task_runs SET started_at = ? "
|
|
"WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)",
|
|
(five_hours_ago, t),
|
|
)
|
|
# Counter starts at 0; assert that's our baseline.
|
|
row = conn.execute(
|
|
"SELECT consecutive_failures FROM tasks WHERE id = ?", (t,)
|
|
).fetchone()
|
|
assert row["consecutive_failures"] in (0, None)
|
|
|
|
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
|
|
stale = kb.detect_stale_running(
|
|
conn, stale_timeout_seconds=14400, signal_fn=lambda p, s: None,
|
|
)
|
|
assert t in stale, "Task should be reclaimed by stale detection"
|
|
|
|
# Critical assertion: the failure counter MUST NOT have ticked.
|
|
# Stale reclaim resets to ready for re-dispatch without penalty.
|
|
row = conn.execute(
|
|
"SELECT consecutive_failures FROM tasks WHERE id = ?", (t,)
|
|
).fetchone()
|
|
assert row["consecutive_failures"] in (0, None), (
|
|
f"Stale reclaim ticked consecutive_failures to "
|
|
f"{row['consecutive_failures']!r}; should remain 0/NULL."
|
|
)
|
|
|
|
# And the audit trail still records the stale event so operators
|
|
# can see what happened.
|
|
events = conn.execute(
|
|
"SELECT kind FROM task_events WHERE task_id = ? ORDER BY id",
|
|
(t,),
|
|
).fetchall()
|
|
kinds = [e["kind"] for e in events]
|
|
assert "stale" in kinds, (
|
|
f"Expected 'stale' event in task_events; got {kinds!r}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Corruption guard (issue #30687)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _write_corrupt_db(path: Path) -> bytes:
|
|
"""Write a kanban DB with a VALID SQLite header but malformed page content.
|
|
|
|
This is the corruption shape the integrity guard specifically targets
|
|
(e.g. issue #29507 follow-up reports where the file's first 16 bytes
|
|
pass the header byte check but ``PRAGMA integrity_check`` then fails
|
|
because the internal pages are damaged). It's what main's header-only
|
|
validator was letting through, and what this PR adds the full guard
|
|
for.
|
|
"""
|
|
# 100-byte SQLite header (magic + minimal valid-looking fields) so the
|
|
# cheap header check passes, then deliberate garbage so sqlite refuses
|
|
# to read the file past the header.
|
|
header = b"SQLite format 3\x00" + b"\x10\x00\x02\x02\x00\x40\x20\x20"
|
|
header += b"\x00\x00\x00\x0c\x00\x00\x23\x46\x00\x00\x00\x00"
|
|
header = header.ljust(100, b"\x00")
|
|
payload = b"definitely not a valid sqlite page \x00\x01\x02\x03" * 64
|
|
blob = header + payload
|
|
path.write_bytes(blob)
|
|
return blob
|
|
|
|
|
|
def test_init_db_refuses_corrupt_existing_file(tmp_path):
|
|
db_path = tmp_path / "kanban.db"
|
|
original = _write_corrupt_db(db_path)
|
|
# Ensure the cache doesn't mask the guard.
|
|
kb._INITIALIZED_PATHS.discard(str(db_path.resolve()))
|
|
|
|
with pytest.raises(kb.KanbanDbCorruptError) as excinfo:
|
|
kb.init_db(db_path=db_path)
|
|
|
|
err = excinfo.value
|
|
assert err.db_path == db_path
|
|
assert err.backup_path is not None
|
|
assert err.backup_path.exists()
|
|
assert err.backup_path.read_bytes() == original
|
|
# Original bytes untouched — no schema was written on top.
|
|
assert db_path.read_bytes() == original
|
|
assert str(db_path) in str(err)
|
|
assert str(err.backup_path) in str(err)
|
|
|
|
|
|
def test_connect_refuses_corrupt_existing_file(tmp_path):
|
|
db_path = tmp_path / "kanban.db"
|
|
_write_corrupt_db(db_path)
|
|
kb._INITIALIZED_PATHS.discard(str(db_path.resolve()))
|
|
|
|
with pytest.raises(kb.KanbanDbCorruptError):
|
|
kb.connect(db_path=db_path)
|
|
|
|
|
|
def test_locked_healthy_db_does_not_classify_as_corrupt(tmp_path, monkeypatch):
|
|
"""A transient lock during the probe must not produce a .corrupt backup
|
|
and must not be reported as :class:`KanbanDbCorruptError`. Raw sqlite
|
|
``OperationalError`` (lock/busy) is acceptable and expected."""
|
|
db_path = tmp_path / "kanban.db"
|
|
kb.init_db(db_path=db_path)
|
|
kb._INITIALIZED_PATHS.discard(str(db_path.resolve()))
|
|
|
|
real_connect = sqlite3.connect
|
|
|
|
def flaky_connect(*args, **kwargs):
|
|
# First call is the integrity probe — simulate a lock.
|
|
raise sqlite3.OperationalError("database is locked")
|
|
|
|
monkeypatch.setattr(kb.sqlite3, "connect", flaky_connect)
|
|
|
|
with pytest.raises(sqlite3.OperationalError):
|
|
kb.connect(db_path=db_path)
|
|
|
|
# No .corrupt backup may be produced for a healthy-but-locked DB.
|
|
backups = list(tmp_path.glob("*.corrupt.*"))
|
|
assert backups == [], f"unexpected corrupt backups: {backups}"
|
|
|
|
# And once the lock clears, normal access still works.
|
|
monkeypatch.setattr(kb.sqlite3, "connect", real_connect)
|
|
with kb.connect(db_path=db_path) as conn:
|
|
kb.create_task(conn, title="still here")
|
|
titles = [t.title for t in kb.list_tasks(conn)]
|
|
assert "still here" in titles
|
|
|
|
|
|
def test_init_db_allows_missing_then_healthy(tmp_path):
|
|
db_path = tmp_path / "fresh.db"
|
|
assert not db_path.exists()
|
|
kb.init_db(db_path=db_path)
|
|
assert db_path.exists() and db_path.stat().st_size > 0
|
|
|
|
# Idempotent on a healthy DB: data survives a second init.
|
|
with kb.connect(db_path=db_path) as conn:
|
|
kb.create_task(conn, title="keeps")
|
|
kb.init_db(db_path=db_path)
|
|
with kb.connect(db_path=db_path) as conn:
|
|
tasks = kb.list_tasks(conn)
|
|
assert [t.title for t in tasks] == ["keeps"]
|