"""Tests for the Kanban DB layer (hermes_cli.kanban_db).""" from __future__ import annotations import concurrent.futures import os import time from pathlib import Path import pytest from hermes_cli import kanban_db as kb @pytest.fixture def kanban_home(tmp_path, monkeypatch): """Isolated HERMES_HOME with an empty kanban DB.""" home = tmp_path / ".hermes" home.mkdir() monkeypatch.setenv("HERMES_HOME", str(home)) monkeypatch.setattr(Path, "home", lambda: tmp_path) kb.init_db() return home # --------------------------------------------------------------------------- # Schema / init # --------------------------------------------------------------------------- def test_init_db_is_idempotent(kanban_home): # Second call should not error or drop data. with kb.connect() as conn: kb.create_task(conn, title="persisted") kb.init_db() with kb.connect() as conn: tasks = kb.list_tasks(conn) assert len(tasks) == 1 assert tasks[0].title == "persisted" def test_init_creates_expected_tables(kanban_home): with kb.connect() as conn: rows = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name" ).fetchall() names = {r["name"] for r in rows} assert {"tasks", "task_links", "task_comments", "task_events"} <= names # --------------------------------------------------------------------------- # Task creation + status inference # --------------------------------------------------------------------------- def test_create_task_no_parents_is_ready(kanban_home): with kb.connect() as conn: tid = kb.create_task(conn, title="ship it", assignee="alice") t = kb.get_task(conn, tid) assert t is not None assert t.status == "ready" assert t.assignee == "alice" assert t.workspace_kind == "scratch" def test_create_task_with_parent_is_todo_until_parent_done(kanban_home): with kb.connect() as conn: p = kb.create_task(conn, title="parent") c = kb.create_task(conn, title="child", parents=[p]) assert kb.get_task(conn, c).status == "todo" kb.complete_task(conn, p, result="ok") assert kb.get_task(conn, c).status == "ready" def test_create_task_unknown_parent_errors(kanban_home): with kb.connect() as conn, pytest.raises(ValueError, match="unknown parent"): kb.create_task(conn, title="orphan", parents=["t_ghost"]) def test_workspace_kind_validation(kanban_home): with kb.connect() as conn, pytest.raises(ValueError, match="workspace_kind"): kb.create_task(conn, title="bad ws", workspace_kind="cloud") # --------------------------------------------------------------------------- # Links + dependency resolution # --------------------------------------------------------------------------- def test_link_demotes_ready_child_to_todo_when_parent_not_done(kanban_home): with kb.connect() as conn: a = kb.create_task(conn, title="a") b = kb.create_task(conn, title="b") assert kb.get_task(conn, b).status == "ready" kb.link_tasks(conn, a, b) assert kb.get_task(conn, b).status == "todo" def test_link_keeps_ready_child_when_parent_already_done(kanban_home): with kb.connect() as conn: a = kb.create_task(conn, title="a") kb.complete_task(conn, a) b = kb.create_task(conn, title="b") assert kb.get_task(conn, b).status == "ready" kb.link_tasks(conn, a, b) assert kb.get_task(conn, b).status == "ready" def test_link_rejects_self_loop(kanban_home): with kb.connect() as conn: a = kb.create_task(conn, title="a") with pytest.raises(ValueError, match="itself"): kb.link_tasks(conn, a, a) def test_link_detects_cycle(kanban_home): with kb.connect() as conn: a = kb.create_task(conn, title="a") b = kb.create_task(conn, title="b", parents=[a]) c = kb.create_task(conn, title="c", parents=[b]) with pytest.raises(ValueError, match="cycle"): kb.link_tasks(conn, c, a) with pytest.raises(ValueError, match="cycle"): kb.link_tasks(conn, b, a) def test_recompute_ready_cascades_through_chain(kanban_home): with kb.connect() as conn: a = kb.create_task(conn, title="a") b = kb.create_task(conn, title="b", parents=[a]) c = kb.create_task(conn, title="c", parents=[b]) assert [kb.get_task(conn, x).status for x in (a, b, c)] == \ ["ready", "todo", "todo"] kb.complete_task(conn, a) assert kb.get_task(conn, b).status == "ready" kb.complete_task(conn, b) assert kb.get_task(conn, c).status == "ready" def test_recompute_ready_fan_in_waits_for_all_parents(kanban_home): with kb.connect() as conn: a = kb.create_task(conn, title="a") b = kb.create_task(conn, title="b") c = kb.create_task(conn, title="c", parents=[a, b]) kb.complete_task(conn, a) assert kb.get_task(conn, c).status == "todo" kb.complete_task(conn, b) assert kb.get_task(conn, c).status == "ready" # --------------------------------------------------------------------------- # Atomic claim (CAS) # --------------------------------------------------------------------------- def test_claim_once_wins_second_loses(kanban_home): with kb.connect() as conn: t = kb.create_task(conn, title="x", assignee="a") first = kb.claim_task(conn, t, claimer="host:1") assert first is not None and first.status == "running" second = kb.claim_task(conn, t, claimer="host:2") assert second is None def test_claim_fails_on_non_ready(kanban_home): with kb.connect() as conn: t = kb.create_task(conn, title="x") # Move to todo by introducing an unsatisfied parent. p = kb.create_task(conn, title="p") kb.link_tasks(conn, p, t) assert kb.get_task(conn, t).status == "todo" assert kb.claim_task(conn, t) is None def test_stale_claim_reclaimed(kanban_home): with kb.connect() as conn: t = kb.create_task(conn, title="x", assignee="a") kb.claim_task(conn, t) # Rewind claim_expires so it looks stale. conn.execute( "UPDATE tasks SET claim_expires = ? WHERE id = ?", (int(time.time()) - 3600, t), ) reclaimed = kb.release_stale_claims(conn) assert reclaimed == 1 assert kb.get_task(conn, t).status == "ready" def test_heartbeat_extends_claim(kanban_home): with kb.connect() as conn: t = kb.create_task(conn, title="x", assignee="a") claimer = "host:hb" kb.claim_task(conn, t, claimer=claimer, ttl_seconds=60) original = kb.get_task(conn, t).claim_expires # Rewind then heartbeat. conn.execute("UPDATE tasks SET claim_expires = ? WHERE id = ?", (0, t)) ok = kb.heartbeat_claim(conn, t, claimer=claimer, ttl_seconds=3600) assert ok new = kb.get_task(conn, t).claim_expires assert new > int(time.time()) + 3000 def test_concurrent_claims_only_one_wins(kanban_home): """Fire N threads claiming the same task; exactly one must win.""" with kb.connect() as conn: t = kb.create_task(conn, title="race", assignee="a") def attempt(i): with kb.connect() as c: return kb.claim_task(c, t, claimer=f"host:{i}") n_workers = 8 with concurrent.futures.ThreadPoolExecutor(max_workers=n_workers) as ex: results = list(ex.map(attempt, range(n_workers))) winners = [r for r in results if r is not None] assert len(winners) == 1 assert winners[0].status == "running" # --------------------------------------------------------------------------- # Complete / block / unblock / archive / assign # --------------------------------------------------------------------------- def test_complete_records_result(kanban_home): with kb.connect() as conn: t = kb.create_task(conn, title="x") assert kb.complete_task(conn, t, result="done and dusted") task = kb.get_task(conn, t) assert task.status == "done" assert task.result == "done and dusted" assert task.completed_at is not None def test_block_then_unblock(kanban_home): with kb.connect() as conn: t = kb.create_task(conn, title="x", assignee="a") kb.claim_task(conn, t) assert kb.block_task(conn, t, reason="need input") assert kb.get_task(conn, t).status == "blocked" assert kb.unblock_task(conn, t) assert kb.get_task(conn, t).status == "ready" def test_assign_refuses_while_running(kanban_home): with kb.connect() as conn: t = kb.create_task(conn, title="x", assignee="a") kb.claim_task(conn, t) with pytest.raises(RuntimeError, match="currently running"): kb.assign_task(conn, t, "b") def test_assign_reassigns_when_not_running(kanban_home): with kb.connect() as conn: t = kb.create_task(conn, title="x", assignee="a") assert kb.assign_task(conn, t, "b") assert kb.get_task(conn, t).assignee == "b" def test_archive_hides_from_default_list(kanban_home): with kb.connect() as conn: t = kb.create_task(conn, title="x") kb.complete_task(conn, t) assert kb.archive_task(conn, t) assert len(kb.list_tasks(conn)) == 0 assert len(kb.list_tasks(conn, include_archived=True)) == 1 # --------------------------------------------------------------------------- # Comments / events / worker context # --------------------------------------------------------------------------- def test_comments_recorded_in_order(kanban_home): with kb.connect() as conn: t = kb.create_task(conn, title="x") kb.add_comment(conn, t, "user", "first") kb.add_comment(conn, t, "researcher", "second") comments = kb.list_comments(conn, t) assert [c.body for c in comments] == ["first", "second"] assert [c.author for c in comments] == ["user", "researcher"] def test_empty_comment_rejected(kanban_home): with kb.connect() as conn: t = kb.create_task(conn, title="x") with pytest.raises(ValueError, match="body is required"): kb.add_comment(conn, t, "user", "") def test_events_capture_lifecycle(kanban_home): with kb.connect() as conn: t = kb.create_task(conn, title="x", assignee="a") kb.claim_task(conn, t) kb.complete_task(conn, t, result="ok") events = kb.list_events(conn, t) kinds = [e.kind for e in events] assert "created" in kinds assert "claimed" in kinds assert "completed" in kinds def test_worker_context_includes_parent_results_and_comments(kanban_home): with kb.connect() as conn: p = kb.create_task(conn, title="p") kb.complete_task(conn, p, result="PARENT_RESULT_MARKER") c = kb.create_task(conn, title="child", parents=[p]) kb.add_comment(conn, c, "user", "CLARIFICATION_MARKER") ctx = kb.build_worker_context(conn, c) assert "PARENT_RESULT_MARKER" in ctx assert "CLARIFICATION_MARKER" in ctx assert c in ctx assert "child" in ctx # --------------------------------------------------------------------------- # Dispatcher # --------------------------------------------------------------------------- def test_dispatch_dry_run_does_not_claim(kanban_home): with kb.connect() as conn: t1 = kb.create_task(conn, title="a", assignee="alice") t2 = kb.create_task(conn, title="b", assignee="bob") res = kb.dispatch_once(conn, dry_run=True) assert {s[0] for s in res.spawned} == {t1, t2} with kb.connect() as conn: # Dry run must NOT mutate status. assert kb.get_task(conn, t1).status == "ready" assert kb.get_task(conn, t2).status == "ready" def test_dispatch_skips_unassigned(kanban_home): with kb.connect() as conn: t = kb.create_task(conn, title="floater") res = kb.dispatch_once(conn, dry_run=True) assert t in res.skipped_unassigned assert not res.spawned def test_dispatch_promotes_ready_and_spawns(kanban_home): spawns = [] def fake_spawn(task, workspace): spawns.append((task.id, task.assignee, workspace)) with kb.connect() as conn: p = kb.create_task(conn, title="p", assignee="alice") c = kb.create_task(conn, title="c", assignee="bob", parents=[p]) # Finish parent outside dispatch; promotion happens inside. kb.complete_task(conn, p) res = kb.dispatch_once(conn, spawn_fn=fake_spawn) # Spawned c (a was already done when dispatch was called). assert len(spawns) == 1 assert spawns[0][0] == c assert spawns[0][1] == "bob" # c is now running with kb.connect() as conn: assert kb.get_task(conn, c).status == "running" def test_dispatch_spawn_failure_releases_claim(kanban_home): def boom(task, workspace): raise RuntimeError("spawn failed") with kb.connect() as conn: t = kb.create_task(conn, title="boom", assignee="alice") kb.dispatch_once(conn, spawn_fn=boom) # Must return to ready so the next tick can retry. assert kb.get_task(conn, t).status == "ready" assert kb.get_task(conn, t).claim_lock is None def test_dispatch_reclaims_stale_before_spawning(kanban_home): with kb.connect() as conn: t = kb.create_task(conn, title="x", assignee="alice") kb.claim_task(conn, t) conn.execute( "UPDATE tasks SET claim_expires = ? WHERE id = ?", (int(time.time()) - 1, t), ) res = kb.dispatch_once(conn, dry_run=True) assert res.reclaimed == 1 # --------------------------------------------------------------------------- # Workspace resolution # --------------------------------------------------------------------------- def test_scratch_workspace_created_under_hermes_home(kanban_home): with kb.connect() as conn: t = kb.create_task(conn, title="x") task = kb.get_task(conn, t) ws = kb.resolve_workspace(task) assert ws.exists() assert ws.is_dir() assert "kanban" in str(ws) def test_dir_workspace_honors_given_path(kanban_home, tmp_path): target = tmp_path / "my-vault" with kb.connect() as conn: t = kb.create_task( conn, title="biz", workspace_kind="dir", workspace_path=str(target) ) task = kb.get_task(conn, t) ws = kb.resolve_workspace(task) assert ws == target assert ws.exists() def test_worktree_workspace_returns_intended_path(kanban_home, tmp_path): target = str(tmp_path / ".worktrees" / "my-task") with kb.connect() as conn: t = kb.create_task( conn, title="ship", workspace_kind="worktree", workspace_path=target ) task = kb.get_task(conn, t) ws = kb.resolve_workspace(task) # We do NOT auto-create worktrees; the worker's skill handles that. assert str(ws) == target # --------------------------------------------------------------------------- # Tenancy # --------------------------------------------------------------------------- def test_tenant_column_filters_listings(kanban_home): with kb.connect() as conn: kb.create_task(conn, title="a1", tenant="biz-a") kb.create_task(conn, title="b1", tenant="biz-b") kb.create_task(conn, title="shared") # no tenant biz_a = kb.list_tasks(conn, tenant="biz-a") biz_b = kb.list_tasks(conn, tenant="biz-b") assert [t.title for t in biz_a] == ["a1"] assert [t.title for t in biz_b] == ["b1"] def test_tenant_propagates_to_events(kanban_home): with kb.connect() as conn: t = kb.create_task(conn, title="tenant-task", tenant="biz-a") events = kb.list_events(conn, t) # The "created" event should have tenant in its payload. created = [e for e in events if e.kind == "created"] assert created and created[0].payload.get("tenant") == "biz-a" # --------------------------------------------------------------------------- # Shared-board path resolution (issue #19348) # # The kanban board is a cross-profile coordination primitive: a worker # spawned with `hermes -p ` must read/write the same kanban.db # as the dispatcher that claimed the task. These tests exercise the # path-resolution layer directly and would have caught the regression # where `kanban_db_path()` resolved to the active profile's HERMES_HOME. # --------------------------------------------------------------------------- class TestSharedBoardPaths: """`kanban_home`/`kanban_db_path`/`workspaces_root`/`worker_log_path` must anchor at the **shared root**, not the active profile's HERMES_HOME.""" def _set_home(self, monkeypatch, tmp_path, hermes_home): monkeypatch.setattr(Path, "home", lambda: tmp_path) monkeypatch.setenv("HERMES_HOME", str(hermes_home)) monkeypatch.delenv("HERMES_KANBAN_HOME", raising=False) def test_default_install_anchors_at_home_dot_hermes( self, tmp_path, monkeypatch ): # Standard install: HERMES_HOME == ~/.hermes, no profile active. default_home = tmp_path / ".hermes" default_home.mkdir() self._set_home(monkeypatch, tmp_path, default_home) assert kb.kanban_home() == default_home assert kb.kanban_db_path() == default_home / "kanban.db" assert kb.workspaces_root() == default_home / "kanban" / "workspaces" assert ( kb.worker_log_path("t_demo") == default_home / "kanban" / "logs" / "t_demo.log" ) def test_profile_worker_resolves_to_shared_root( self, tmp_path, monkeypatch ): # Reproduces the bug: dispatcher uses ~/.hermes/kanban.db, # worker spawned with -p previously resolved to # ~/.hermes/profiles//kanban.db. After the fix both # converge on ~/.hermes/kanban.db. default_home = tmp_path / ".hermes" default_home.mkdir() profile_home = default_home / "profiles" / "nehemiahkanban" profile_home.mkdir(parents=True) self._set_home(monkeypatch, tmp_path, profile_home) # All four resolvers must anchor at the shared root, not the # profile-local HERMES_HOME. assert kb.kanban_home() == default_home assert kb.kanban_db_path() == default_home / "kanban.db" assert kb.workspaces_root() == default_home / "kanban" / "workspaces" assert ( kb.worker_log_path("t_0d214f19") == default_home / "kanban" / "logs" / "t_0d214f19.log" ) # Sanity: the profile-local path that used to be returned is # explicitly NOT what we resolve to anymore. assert kb.kanban_db_path() != profile_home / "kanban.db" def test_dispatcher_and_profile_worker_converge( self, tmp_path, monkeypatch ): # End-to-end convergence: resolve the path under each side's # HERMES_HOME and confirm equality. This is the property the # dispatcher/worker handoff actually depends on. default_home = tmp_path / ".hermes" default_home.mkdir() profile_home = default_home / "profiles" / "coder" profile_home.mkdir(parents=True) # Dispatcher's perspective. self._set_home(monkeypatch, tmp_path, default_home) dispatcher_db = kb.kanban_db_path() dispatcher_ws = kb.workspaces_root() dispatcher_log = kb.worker_log_path("t_handoff") # Worker's perspective (profile activated by `hermes -p coder`). monkeypatch.setenv("HERMES_HOME", str(profile_home)) worker_db = kb.kanban_db_path() worker_ws = kb.workspaces_root() worker_log = kb.worker_log_path("t_handoff") assert dispatcher_db == worker_db assert dispatcher_ws == worker_ws assert dispatcher_log == worker_log def test_docker_custom_hermes_home_uses_env_path_directly( self, tmp_path, monkeypatch ): # Docker / custom deployment: HERMES_HOME points outside ~/.hermes. # `get_default_hermes_root()` returns env_home directly when it # is not a `/profiles/` shape and not under # `Path.home() / ".hermes"`. custom_root = tmp_path / "opt" / "hermes" custom_root.mkdir(parents=True) self._set_home(monkeypatch, tmp_path, custom_root) assert kb.kanban_home() == custom_root assert kb.kanban_db_path() == custom_root / "kanban.db" def test_docker_profile_layout_uses_grandparent( self, tmp_path, monkeypatch ): # Docker profile shape: HERMES_HOME=/opt/hermes/profiles/coder; # `get_default_hermes_root()` walks up to /opt/hermes because # the immediate parent dir is named "profiles". custom_root = tmp_path / "opt" / "hermes" profile = custom_root / "profiles" / "coder" profile.mkdir(parents=True) self._set_home(monkeypatch, tmp_path, profile) assert kb.kanban_home() == custom_root assert kb.kanban_db_path() == custom_root / "kanban.db" def test_explicit_override_via_hermes_kanban_home( self, tmp_path, monkeypatch ): # Explicit override: HERMES_KANBAN_HOME beats every other # resolution rule. default_home = tmp_path / ".hermes" profile_home = default_home / "profiles" / "any" profile_home.mkdir(parents=True) override = tmp_path / "shared-board" override.mkdir() monkeypatch.setattr(Path, "home", lambda: tmp_path) monkeypatch.setenv("HERMES_HOME", str(profile_home)) monkeypatch.setenv("HERMES_KANBAN_HOME", str(override)) assert kb.kanban_home() == override assert kb.kanban_db_path() == override / "kanban.db" assert kb.workspaces_root() == override / "kanban" / "workspaces" def test_empty_override_falls_through(self, tmp_path, monkeypatch): # Empty/whitespace override is treated as unset. default_home = tmp_path / ".hermes" default_home.mkdir() monkeypatch.setattr(Path, "home", lambda: tmp_path) monkeypatch.setenv("HERMES_HOME", str(default_home)) monkeypatch.setenv("HERMES_KANBAN_HOME", " ") assert kb.kanban_home() == default_home def test_dispatcher_and_worker_share_a_real_database( self, tmp_path, monkeypatch ): # Belt-and-suspenders: round-trip a task across the two # HERMES_HOME perspectives via a real SQLite file. Without the # fix the worker would open a different file and see no rows. default_home = tmp_path / ".hermes" default_home.mkdir() profile_home = default_home / "profiles" / "nehemiahkanban" profile_home.mkdir(parents=True) # Dispatcher creates the board and a task. self._set_home(monkeypatch, tmp_path, default_home) kb.init_db() with kb.connect() as conn: task_id = kb.create_task(conn, title="cross-profile") # Worker switches to the profile HERMES_HOME and reads. monkeypatch.setenv("HERMES_HOME", str(profile_home)) with kb.connect() as conn: task = kb.get_task(conn, task_id) assert task is not None assert task.title == "cross-profile" def test_hermes_kanban_db_pin_beats_kanban_home( self, tmp_path, monkeypatch ): # HERMES_KANBAN_DB pins the file path directly and beats both # HERMES_KANBAN_HOME and the `get_default_hermes_root()` path. # This is the env the dispatcher injects into workers. default_home = tmp_path / ".hermes" default_home.mkdir() umbrella = tmp_path / "umbrella" umbrella.mkdir() pinned_db = tmp_path / "pinned" / "board.db" pinned_db.parent.mkdir() monkeypatch.setattr(Path, "home", lambda: tmp_path) monkeypatch.setenv("HERMES_HOME", str(default_home)) monkeypatch.setenv("HERMES_KANBAN_HOME", str(umbrella)) monkeypatch.setenv("HERMES_KANBAN_DB", str(pinned_db)) assert kb.kanban_db_path() == pinned_db # workspaces_root still follows HERMES_KANBAN_HOME -- the pins # are independent. assert kb.workspaces_root() == umbrella / "kanban" / "workspaces" def test_hermes_kanban_workspaces_root_pin_beats_kanban_home( self, tmp_path, monkeypatch ): # HERMES_KANBAN_WORKSPACES_ROOT pins the workspaces root directly. default_home = tmp_path / ".hermes" default_home.mkdir() umbrella = tmp_path / "umbrella" umbrella.mkdir() pinned_ws = tmp_path / "pinned-workspaces" pinned_ws.mkdir() monkeypatch.setattr(Path, "home", lambda: tmp_path) monkeypatch.setenv("HERMES_HOME", str(default_home)) monkeypatch.setenv("HERMES_KANBAN_HOME", str(umbrella)) monkeypatch.setenv("HERMES_KANBAN_WORKSPACES_ROOT", str(pinned_ws)) assert kb.workspaces_root() == pinned_ws # kanban_db_path still follows HERMES_KANBAN_HOME. assert kb.kanban_db_path() == umbrella / "kanban.db" def test_empty_per_path_overrides_fall_through( self, tmp_path, monkeypatch ): # Empty/whitespace pins are treated as unset, same as # HERMES_KANBAN_HOME. default_home = tmp_path / ".hermes" default_home.mkdir() monkeypatch.setattr(Path, "home", lambda: tmp_path) monkeypatch.setenv("HERMES_HOME", str(default_home)) monkeypatch.setenv("HERMES_KANBAN_DB", " ") monkeypatch.setenv("HERMES_KANBAN_WORKSPACES_ROOT", "") assert kb.kanban_db_path() == default_home / "kanban.db" assert kb.workspaces_root() == default_home / "kanban" / "workspaces" def test_dispatcher_spawn_injects_kanban_db_and_workspaces_root( self, tmp_path, monkeypatch ): # The dispatcher's `_default_spawn` must inject HERMES_KANBAN_DB # and HERMES_KANBAN_WORKSPACES_ROOT into the worker env so the # worker converges on the dispatcher's paths even when the # `-p ` flag rewrites HERMES_HOME. default_home = tmp_path / ".hermes" default_home.mkdir() self._set_home(monkeypatch, tmp_path, default_home) captured = {} class _FakePopen: def __init__(self, cmd, **kwargs): captured["cmd"] = cmd captured["env"] = kwargs.get("env", {}) self.pid = 4242 monkeypatch.setattr("subprocess.Popen", _FakePopen) task = kb.Task( id="t_dispatch_env", title="x", body=None, assignee="coder", status="ready", priority=0, created_by=None, created_at=0, started_at=None, completed_at=None, workspace_kind="scratch", workspace_path=None, claim_lock=None, claim_expires=None, tenant=None, ) kb._default_spawn(task, str(tmp_path / "ws")) env = captured["env"] assert env["HERMES_KANBAN_DB"] == str(default_home / "kanban.db") assert env["HERMES_KANBAN_WORKSPACES_ROOT"] == str( default_home / "kanban" / "workspaces" ) assert env["HERMES_KANBAN_TASK"] == "t_dispatch_env"