hermes-agent/tests/hermes_cli/test_kanban_db.py
teknium1 2658494e81 fix(kanban): add per-path env overrides + dispatcher env injection
Layers defense-in-depth on top of the shared-root anchoring (base commit).

Changes in hermes_cli/kanban_db.py:
- kanban_db_path() now honours HERMES_KANBAN_DB first, then falls through
  to kanban_home()/kanban.db.
- workspaces_root() now honours HERMES_KANBAN_WORKSPACES_ROOT first, then
  falls through to kanban_home()/kanban/workspaces.
- All three overrides (HERMES_KANBAN_HOME, HERMES_KANBAN_DB,
  HERMES_KANBAN_WORKSPACES_ROOT) now call .expanduser() for consistency.
- _default_spawn() injects HERMES_KANBAN_DB and
  HERMES_KANBAN_WORKSPACES_ROOT into the worker subprocess env. Even
  when the worker's get_default_hermes_root() resolution somehow
  disagrees with the dispatcher's (symlinks, unusual Docker layouts),
  the two processes still open the same SQLite file.

Module docstring updated to describe all three overrides and the
dispatcher env-injection contract.

Tests (tests/hermes_cli/test_kanban_db.py, TestSharedBoardPaths):
- test_hermes_kanban_db_pin_beats_kanban_home
- test_hermes_kanban_workspaces_root_pin_beats_kanban_home
- test_empty_per_path_overrides_fall_through
- test_dispatcher_spawn_injects_kanban_db_and_workspaces_root
  (monkeypatches subprocess.Popen, asserts both env vars reach the
  child even after HERMES_HOME is rewritten by `hermes -p <profile>`.)

Docs: website/docs/reference/environment-variables.md gets entries
for the three kanban env vars.

This fusion is built on the cleanest of the seven competing PRs that
targeted issue #18442:

* Base commit (from PR #19350 by @GodsBoy): add `kanban_home()` helper
  anchored at `get_default_hermes_root()`, reroute all 5 kanban path
  sites through it (including the 3 sibling log-dir sites that the
  other six PRs missed), 8-test regression class.
* Dispatcher env-var injection approach drawn from PRs #18300
  (@quocanh261997) and #19100 (@cg2aigc).
* Per-path env overrides drawn from PR #19100 (@cg2aigc).
* get_default_hermes_root() resolution direction first proposed in
  PR #18503 (@beibi9966) and PR #18985 (@Gosuj).

Closes the duplicate/competing PRs: #18300, #18503, #18670, #18985,
#19037, #19056, #19100. Fixes #18442 and #19348.

Co-authored-by: quocanh261997 <17986614+quocanh261997@users.noreply.github.com>
Co-authored-by: cg2aigc <232694053+cg2aigc@users.noreply.github.com>
Co-authored-by: beibi9966 <beibei1988@proton.me>
Co-authored-by: Gosuj <123411271+Gosuj@users.noreply.github.com>
Co-authored-by: LeonSGP43 <154585401+LeonSGP43@users.noreply.github.com>
2026-05-03 15:13:39 -07:00

714 lines
27 KiB
Python

"""Tests for the Kanban DB layer (hermes_cli.kanban_db)."""
from __future__ import annotations
import concurrent.futures
import os
import time
from pathlib import Path
import pytest
from hermes_cli import kanban_db as kb
@pytest.fixture
def kanban_home(tmp_path, monkeypatch):
"""Isolated HERMES_HOME with an empty kanban DB."""
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
kb.init_db()
return home
# ---------------------------------------------------------------------------
# Schema / init
# ---------------------------------------------------------------------------
def test_init_db_is_idempotent(kanban_home):
# Second call should not error or drop data.
with kb.connect() as conn:
kb.create_task(conn, title="persisted")
kb.init_db()
with kb.connect() as conn:
tasks = kb.list_tasks(conn)
assert len(tasks) == 1
assert tasks[0].title == "persisted"
def test_init_creates_expected_tables(kanban_home):
with kb.connect() as conn:
rows = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
).fetchall()
names = {r["name"] for r in rows}
assert {"tasks", "task_links", "task_comments", "task_events"} <= names
# ---------------------------------------------------------------------------
# Task creation + status inference
# ---------------------------------------------------------------------------
def test_create_task_no_parents_is_ready(kanban_home):
with kb.connect() as conn:
tid = kb.create_task(conn, title="ship it", assignee="alice")
t = kb.get_task(conn, tid)
assert t is not None
assert t.status == "ready"
assert t.assignee == "alice"
assert t.workspace_kind == "scratch"
def test_create_task_with_parent_is_todo_until_parent_done(kanban_home):
with kb.connect() as conn:
p = kb.create_task(conn, title="parent")
c = kb.create_task(conn, title="child", parents=[p])
assert kb.get_task(conn, c).status == "todo"
kb.complete_task(conn, p, result="ok")
assert kb.get_task(conn, c).status == "ready"
def test_create_task_unknown_parent_errors(kanban_home):
with kb.connect() as conn, pytest.raises(ValueError, match="unknown parent"):
kb.create_task(conn, title="orphan", parents=["t_ghost"])
def test_workspace_kind_validation(kanban_home):
with kb.connect() as conn, pytest.raises(ValueError, match="workspace_kind"):
kb.create_task(conn, title="bad ws", workspace_kind="cloud")
# ---------------------------------------------------------------------------
# Links + dependency resolution
# ---------------------------------------------------------------------------
def test_link_demotes_ready_child_to_todo_when_parent_not_done(kanban_home):
with kb.connect() as conn:
a = kb.create_task(conn, title="a")
b = kb.create_task(conn, title="b")
assert kb.get_task(conn, b).status == "ready"
kb.link_tasks(conn, a, b)
assert kb.get_task(conn, b).status == "todo"
def test_link_keeps_ready_child_when_parent_already_done(kanban_home):
with kb.connect() as conn:
a = kb.create_task(conn, title="a")
kb.complete_task(conn, a)
b = kb.create_task(conn, title="b")
assert kb.get_task(conn, b).status == "ready"
kb.link_tasks(conn, a, b)
assert kb.get_task(conn, b).status == "ready"
def test_link_rejects_self_loop(kanban_home):
with kb.connect() as conn:
a = kb.create_task(conn, title="a")
with pytest.raises(ValueError, match="itself"):
kb.link_tasks(conn, a, a)
def test_link_detects_cycle(kanban_home):
with kb.connect() as conn:
a = kb.create_task(conn, title="a")
b = kb.create_task(conn, title="b", parents=[a])
c = kb.create_task(conn, title="c", parents=[b])
with pytest.raises(ValueError, match="cycle"):
kb.link_tasks(conn, c, a)
with pytest.raises(ValueError, match="cycle"):
kb.link_tasks(conn, b, a)
def test_recompute_ready_cascades_through_chain(kanban_home):
with kb.connect() as conn:
a = kb.create_task(conn, title="a")
b = kb.create_task(conn, title="b", parents=[a])
c = kb.create_task(conn, title="c", parents=[b])
assert [kb.get_task(conn, x).status for x in (a, b, c)] == \
["ready", "todo", "todo"]
kb.complete_task(conn, a)
assert kb.get_task(conn, b).status == "ready"
kb.complete_task(conn, b)
assert kb.get_task(conn, c).status == "ready"
def test_recompute_ready_fan_in_waits_for_all_parents(kanban_home):
with kb.connect() as conn:
a = kb.create_task(conn, title="a")
b = kb.create_task(conn, title="b")
c = kb.create_task(conn, title="c", parents=[a, b])
kb.complete_task(conn, a)
assert kb.get_task(conn, c).status == "todo"
kb.complete_task(conn, b)
assert kb.get_task(conn, c).status == "ready"
# ---------------------------------------------------------------------------
# Atomic claim (CAS)
# ---------------------------------------------------------------------------
def test_claim_once_wins_second_loses(kanban_home):
with kb.connect() as conn:
t = kb.create_task(conn, title="x", assignee="a")
first = kb.claim_task(conn, t, claimer="host:1")
assert first is not None and first.status == "running"
second = kb.claim_task(conn, t, claimer="host:2")
assert second is None
def test_claim_fails_on_non_ready(kanban_home):
with kb.connect() as conn:
t = kb.create_task(conn, title="x")
# Move to todo by introducing an unsatisfied parent.
p = kb.create_task(conn, title="p")
kb.link_tasks(conn, p, t)
assert kb.get_task(conn, t).status == "todo"
assert kb.claim_task(conn, t) is None
def test_stale_claim_reclaimed(kanban_home):
with kb.connect() as conn:
t = kb.create_task(conn, title="x", assignee="a")
kb.claim_task(conn, t)
# Rewind claim_expires so it looks stale.
conn.execute(
"UPDATE tasks SET claim_expires = ? WHERE id = ?",
(int(time.time()) - 3600, t),
)
reclaimed = kb.release_stale_claims(conn)
assert reclaimed == 1
assert kb.get_task(conn, t).status == "ready"
def test_heartbeat_extends_claim(kanban_home):
with kb.connect() as conn:
t = kb.create_task(conn, title="x", assignee="a")
claimer = "host:hb"
kb.claim_task(conn, t, claimer=claimer, ttl_seconds=60)
original = kb.get_task(conn, t).claim_expires
# Rewind then heartbeat.
conn.execute("UPDATE tasks SET claim_expires = ? WHERE id = ?", (0, t))
ok = kb.heartbeat_claim(conn, t, claimer=claimer, ttl_seconds=3600)
assert ok
new = kb.get_task(conn, t).claim_expires
assert new > int(time.time()) + 3000
def test_concurrent_claims_only_one_wins(kanban_home):
"""Fire N threads claiming the same task; exactly one must win."""
with kb.connect() as conn:
t = kb.create_task(conn, title="race", assignee="a")
def attempt(i):
with kb.connect() as c:
return kb.claim_task(c, t, claimer=f"host:{i}")
n_workers = 8
with concurrent.futures.ThreadPoolExecutor(max_workers=n_workers) as ex:
results = list(ex.map(attempt, range(n_workers)))
winners = [r for r in results if r is not None]
assert len(winners) == 1
assert winners[0].status == "running"
# ---------------------------------------------------------------------------
# Complete / block / unblock / archive / assign
# ---------------------------------------------------------------------------
def test_complete_records_result(kanban_home):
with kb.connect() as conn:
t = kb.create_task(conn, title="x")
assert kb.complete_task(conn, t, result="done and dusted")
task = kb.get_task(conn, t)
assert task.status == "done"
assert task.result == "done and dusted"
assert task.completed_at is not None
def test_block_then_unblock(kanban_home):
with kb.connect() as conn:
t = kb.create_task(conn, title="x", assignee="a")
kb.claim_task(conn, t)
assert kb.block_task(conn, t, reason="need input")
assert kb.get_task(conn, t).status == "blocked"
assert kb.unblock_task(conn, t)
assert kb.get_task(conn, t).status == "ready"
def test_assign_refuses_while_running(kanban_home):
with kb.connect() as conn:
t = kb.create_task(conn, title="x", assignee="a")
kb.claim_task(conn, t)
with pytest.raises(RuntimeError, match="currently running"):
kb.assign_task(conn, t, "b")
def test_assign_reassigns_when_not_running(kanban_home):
with kb.connect() as conn:
t = kb.create_task(conn, title="x", assignee="a")
assert kb.assign_task(conn, t, "b")
assert kb.get_task(conn, t).assignee == "b"
def test_archive_hides_from_default_list(kanban_home):
with kb.connect() as conn:
t = kb.create_task(conn, title="x")
kb.complete_task(conn, t)
assert kb.archive_task(conn, t)
assert len(kb.list_tasks(conn)) == 0
assert len(kb.list_tasks(conn, include_archived=True)) == 1
# ---------------------------------------------------------------------------
# Comments / events / worker context
# ---------------------------------------------------------------------------
def test_comments_recorded_in_order(kanban_home):
with kb.connect() as conn:
t = kb.create_task(conn, title="x")
kb.add_comment(conn, t, "user", "first")
kb.add_comment(conn, t, "researcher", "second")
comments = kb.list_comments(conn, t)
assert [c.body for c in comments] == ["first", "second"]
assert [c.author for c in comments] == ["user", "researcher"]
def test_empty_comment_rejected(kanban_home):
with kb.connect() as conn:
t = kb.create_task(conn, title="x")
with pytest.raises(ValueError, match="body is required"):
kb.add_comment(conn, t, "user", "")
def test_events_capture_lifecycle(kanban_home):
with kb.connect() as conn:
t = kb.create_task(conn, title="x", assignee="a")
kb.claim_task(conn, t)
kb.complete_task(conn, t, result="ok")
events = kb.list_events(conn, t)
kinds = [e.kind for e in events]
assert "created" in kinds
assert "claimed" in kinds
assert "completed" in kinds
def test_worker_context_includes_parent_results_and_comments(kanban_home):
with kb.connect() as conn:
p = kb.create_task(conn, title="p")
kb.complete_task(conn, p, result="PARENT_RESULT_MARKER")
c = kb.create_task(conn, title="child", parents=[p])
kb.add_comment(conn, c, "user", "CLARIFICATION_MARKER")
ctx = kb.build_worker_context(conn, c)
assert "PARENT_RESULT_MARKER" in ctx
assert "CLARIFICATION_MARKER" in ctx
assert c in ctx
assert "child" in ctx
# ---------------------------------------------------------------------------
# Dispatcher
# ---------------------------------------------------------------------------
def test_dispatch_dry_run_does_not_claim(kanban_home):
with kb.connect() as conn:
t1 = kb.create_task(conn, title="a", assignee="alice")
t2 = kb.create_task(conn, title="b", assignee="bob")
res = kb.dispatch_once(conn, dry_run=True)
assert {s[0] for s in res.spawned} == {t1, t2}
with kb.connect() as conn:
# Dry run must NOT mutate status.
assert kb.get_task(conn, t1).status == "ready"
assert kb.get_task(conn, t2).status == "ready"
def test_dispatch_skips_unassigned(kanban_home):
with kb.connect() as conn:
t = kb.create_task(conn, title="floater")
res = kb.dispatch_once(conn, dry_run=True)
assert t in res.skipped_unassigned
assert not res.spawned
def test_dispatch_promotes_ready_and_spawns(kanban_home):
spawns = []
def fake_spawn(task, workspace):
spawns.append((task.id, task.assignee, workspace))
with kb.connect() as conn:
p = kb.create_task(conn, title="p", assignee="alice")
c = kb.create_task(conn, title="c", assignee="bob", parents=[p])
# Finish parent outside dispatch; promotion happens inside.
kb.complete_task(conn, p)
res = kb.dispatch_once(conn, spawn_fn=fake_spawn)
# Spawned c (a was already done when dispatch was called).
assert len(spawns) == 1
assert spawns[0][0] == c
assert spawns[0][1] == "bob"
# c is now running
with kb.connect() as conn:
assert kb.get_task(conn, c).status == "running"
def test_dispatch_spawn_failure_releases_claim(kanban_home):
def boom(task, workspace):
raise RuntimeError("spawn failed")
with kb.connect() as conn:
t = kb.create_task(conn, title="boom", assignee="alice")
kb.dispatch_once(conn, spawn_fn=boom)
# Must return to ready so the next tick can retry.
assert kb.get_task(conn, t).status == "ready"
assert kb.get_task(conn, t).claim_lock is None
def test_dispatch_reclaims_stale_before_spawning(kanban_home):
with kb.connect() as conn:
t = kb.create_task(conn, title="x", assignee="alice")
kb.claim_task(conn, t)
conn.execute(
"UPDATE tasks SET claim_expires = ? WHERE id = ?",
(int(time.time()) - 1, t),
)
res = kb.dispatch_once(conn, dry_run=True)
assert res.reclaimed == 1
# ---------------------------------------------------------------------------
# Workspace resolution
# ---------------------------------------------------------------------------
def test_scratch_workspace_created_under_hermes_home(kanban_home):
with kb.connect() as conn:
t = kb.create_task(conn, title="x")
task = kb.get_task(conn, t)
ws = kb.resolve_workspace(task)
assert ws.exists()
assert ws.is_dir()
assert "kanban" in str(ws)
def test_dir_workspace_honors_given_path(kanban_home, tmp_path):
target = tmp_path / "my-vault"
with kb.connect() as conn:
t = kb.create_task(
conn, title="biz", workspace_kind="dir", workspace_path=str(target)
)
task = kb.get_task(conn, t)
ws = kb.resolve_workspace(task)
assert ws == target
assert ws.exists()
def test_worktree_workspace_returns_intended_path(kanban_home, tmp_path):
target = str(tmp_path / ".worktrees" / "my-task")
with kb.connect() as conn:
t = kb.create_task(
conn, title="ship", workspace_kind="worktree", workspace_path=target
)
task = kb.get_task(conn, t)
ws = kb.resolve_workspace(task)
# We do NOT auto-create worktrees; the worker's skill handles that.
assert str(ws) == target
# ---------------------------------------------------------------------------
# Tenancy
# ---------------------------------------------------------------------------
def test_tenant_column_filters_listings(kanban_home):
with kb.connect() as conn:
kb.create_task(conn, title="a1", tenant="biz-a")
kb.create_task(conn, title="b1", tenant="biz-b")
kb.create_task(conn, title="shared") # no tenant
biz_a = kb.list_tasks(conn, tenant="biz-a")
biz_b = kb.list_tasks(conn, tenant="biz-b")
assert [t.title for t in biz_a] == ["a1"]
assert [t.title for t in biz_b] == ["b1"]
def test_tenant_propagates_to_events(kanban_home):
with kb.connect() as conn:
t = kb.create_task(conn, title="tenant-task", tenant="biz-a")
events = kb.list_events(conn, t)
# The "created" event should have tenant in its payload.
created = [e for e in events if e.kind == "created"]
assert created and created[0].payload.get("tenant") == "biz-a"
# ---------------------------------------------------------------------------
# Shared-board path resolution (issue #19348)
#
# The kanban board is a cross-profile coordination primitive: a worker
# spawned with `hermes -p <profile>` must read/write the same kanban.db
# as the dispatcher that claimed the task. These tests exercise the
# path-resolution layer directly and would have caught the regression
# where `kanban_db_path()` resolved to the active profile's HERMES_HOME.
# ---------------------------------------------------------------------------
class TestSharedBoardPaths:
"""`kanban_home`/`kanban_db_path`/`workspaces_root`/`worker_log_path`
must anchor at the **shared root**, not the active profile's HERMES_HOME."""
def _set_home(self, monkeypatch, tmp_path, hermes_home):
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.delenv("HERMES_KANBAN_HOME", raising=False)
def test_default_install_anchors_at_home_dot_hermes(
self, tmp_path, monkeypatch
):
# Standard install: HERMES_HOME == ~/.hermes, no profile active.
default_home = tmp_path / ".hermes"
default_home.mkdir()
self._set_home(monkeypatch, tmp_path, default_home)
assert kb.kanban_home() == default_home
assert kb.kanban_db_path() == default_home / "kanban.db"
assert kb.workspaces_root() == default_home / "kanban" / "workspaces"
assert (
kb.worker_log_path("t_demo")
== default_home / "kanban" / "logs" / "t_demo.log"
)
def test_profile_worker_resolves_to_shared_root(
self, tmp_path, monkeypatch
):
# Reproduces the bug: dispatcher uses ~/.hermes/kanban.db,
# worker spawned with -p <profile> previously resolved to
# ~/.hermes/profiles/<profile>/kanban.db. After the fix both
# converge on ~/.hermes/kanban.db.
default_home = tmp_path / ".hermes"
default_home.mkdir()
profile_home = default_home / "profiles" / "nehemiahkanban"
profile_home.mkdir(parents=True)
self._set_home(monkeypatch, tmp_path, profile_home)
# All four resolvers must anchor at the shared root, not the
# profile-local HERMES_HOME.
assert kb.kanban_home() == default_home
assert kb.kanban_db_path() == default_home / "kanban.db"
assert kb.workspaces_root() == default_home / "kanban" / "workspaces"
assert (
kb.worker_log_path("t_0d214f19")
== default_home / "kanban" / "logs" / "t_0d214f19.log"
)
# Sanity: the profile-local path that used to be returned is
# explicitly NOT what we resolve to anymore.
assert kb.kanban_db_path() != profile_home / "kanban.db"
def test_dispatcher_and_profile_worker_converge(
self, tmp_path, monkeypatch
):
# End-to-end convergence: resolve the path under each side's
# HERMES_HOME and confirm equality. This is the property the
# dispatcher/worker handoff actually depends on.
default_home = tmp_path / ".hermes"
default_home.mkdir()
profile_home = default_home / "profiles" / "coder"
profile_home.mkdir(parents=True)
# Dispatcher's perspective.
self._set_home(monkeypatch, tmp_path, default_home)
dispatcher_db = kb.kanban_db_path()
dispatcher_ws = kb.workspaces_root()
dispatcher_log = kb.worker_log_path("t_handoff")
# Worker's perspective (profile activated by `hermes -p coder`).
monkeypatch.setenv("HERMES_HOME", str(profile_home))
worker_db = kb.kanban_db_path()
worker_ws = kb.workspaces_root()
worker_log = kb.worker_log_path("t_handoff")
assert dispatcher_db == worker_db
assert dispatcher_ws == worker_ws
assert dispatcher_log == worker_log
def test_docker_custom_hermes_home_uses_env_path_directly(
self, tmp_path, monkeypatch
):
# Docker / custom deployment: HERMES_HOME points outside ~/.hermes.
# `get_default_hermes_root()` returns env_home directly when it
# is not a `<root>/profiles/<name>` shape and not under
# `Path.home() / ".hermes"`.
custom_root = tmp_path / "opt" / "hermes"
custom_root.mkdir(parents=True)
self._set_home(monkeypatch, tmp_path, custom_root)
assert kb.kanban_home() == custom_root
assert kb.kanban_db_path() == custom_root / "kanban.db"
def test_docker_profile_layout_uses_grandparent(
self, tmp_path, monkeypatch
):
# Docker profile shape: HERMES_HOME=/opt/hermes/profiles/coder;
# `get_default_hermes_root()` walks up to /opt/hermes because
# the immediate parent dir is named "profiles".
custom_root = tmp_path / "opt" / "hermes"
profile = custom_root / "profiles" / "coder"
profile.mkdir(parents=True)
self._set_home(monkeypatch, tmp_path, profile)
assert kb.kanban_home() == custom_root
assert kb.kanban_db_path() == custom_root / "kanban.db"
def test_explicit_override_via_hermes_kanban_home(
self, tmp_path, monkeypatch
):
# Explicit override: HERMES_KANBAN_HOME beats every other
# resolution rule.
default_home = tmp_path / ".hermes"
profile_home = default_home / "profiles" / "any"
profile_home.mkdir(parents=True)
override = tmp_path / "shared-board"
override.mkdir()
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(profile_home))
monkeypatch.setenv("HERMES_KANBAN_HOME", str(override))
assert kb.kanban_home() == override
assert kb.kanban_db_path() == override / "kanban.db"
assert kb.workspaces_root() == override / "kanban" / "workspaces"
def test_empty_override_falls_through(self, tmp_path, monkeypatch):
# Empty/whitespace override is treated as unset.
default_home = tmp_path / ".hermes"
default_home.mkdir()
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(default_home))
monkeypatch.setenv("HERMES_KANBAN_HOME", " ")
assert kb.kanban_home() == default_home
def test_dispatcher_and_worker_share_a_real_database(
self, tmp_path, monkeypatch
):
# Belt-and-suspenders: round-trip a task across the two
# HERMES_HOME perspectives via a real SQLite file. Without the
# fix the worker would open a different file and see no rows.
default_home = tmp_path / ".hermes"
default_home.mkdir()
profile_home = default_home / "profiles" / "nehemiahkanban"
profile_home.mkdir(parents=True)
# Dispatcher creates the board and a task.
self._set_home(monkeypatch, tmp_path, default_home)
kb.init_db()
with kb.connect() as conn:
task_id = kb.create_task(conn, title="cross-profile")
# Worker switches to the profile HERMES_HOME and reads.
monkeypatch.setenv("HERMES_HOME", str(profile_home))
with kb.connect() as conn:
task = kb.get_task(conn, task_id)
assert task is not None
assert task.title == "cross-profile"
def test_hermes_kanban_db_pin_beats_kanban_home(
self, tmp_path, monkeypatch
):
# HERMES_KANBAN_DB pins the file path directly and beats both
# HERMES_KANBAN_HOME and the `get_default_hermes_root()` path.
# This is the env the dispatcher injects into workers.
default_home = tmp_path / ".hermes"
default_home.mkdir()
umbrella = tmp_path / "umbrella"
umbrella.mkdir()
pinned_db = tmp_path / "pinned" / "board.db"
pinned_db.parent.mkdir()
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(default_home))
monkeypatch.setenv("HERMES_KANBAN_HOME", str(umbrella))
monkeypatch.setenv("HERMES_KANBAN_DB", str(pinned_db))
assert kb.kanban_db_path() == pinned_db
# workspaces_root still follows HERMES_KANBAN_HOME -- the pins
# are independent.
assert kb.workspaces_root() == umbrella / "kanban" / "workspaces"
def test_hermes_kanban_workspaces_root_pin_beats_kanban_home(
self, tmp_path, monkeypatch
):
# HERMES_KANBAN_WORKSPACES_ROOT pins the workspaces root directly.
default_home = tmp_path / ".hermes"
default_home.mkdir()
umbrella = tmp_path / "umbrella"
umbrella.mkdir()
pinned_ws = tmp_path / "pinned-workspaces"
pinned_ws.mkdir()
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(default_home))
monkeypatch.setenv("HERMES_KANBAN_HOME", str(umbrella))
monkeypatch.setenv("HERMES_KANBAN_WORKSPACES_ROOT", str(pinned_ws))
assert kb.workspaces_root() == pinned_ws
# kanban_db_path still follows HERMES_KANBAN_HOME.
assert kb.kanban_db_path() == umbrella / "kanban.db"
def test_empty_per_path_overrides_fall_through(
self, tmp_path, monkeypatch
):
# Empty/whitespace pins are treated as unset, same as
# HERMES_KANBAN_HOME.
default_home = tmp_path / ".hermes"
default_home.mkdir()
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(default_home))
monkeypatch.setenv("HERMES_KANBAN_DB", " ")
monkeypatch.setenv("HERMES_KANBAN_WORKSPACES_ROOT", "")
assert kb.kanban_db_path() == default_home / "kanban.db"
assert kb.workspaces_root() == default_home / "kanban" / "workspaces"
def test_dispatcher_spawn_injects_kanban_db_and_workspaces_root(
self, tmp_path, monkeypatch
):
# The dispatcher's `_default_spawn` must inject HERMES_KANBAN_DB
# and HERMES_KANBAN_WORKSPACES_ROOT into the worker env so the
# worker converges on the dispatcher's paths even when the
# `-p <profile>` flag rewrites HERMES_HOME.
default_home = tmp_path / ".hermes"
default_home.mkdir()
self._set_home(monkeypatch, tmp_path, default_home)
captured = {}
class _FakePopen:
def __init__(self, cmd, **kwargs):
captured["cmd"] = cmd
captured["env"] = kwargs.get("env", {})
self.pid = 4242
monkeypatch.setattr("subprocess.Popen", _FakePopen)
task = kb.Task(
id="t_dispatch_env",
title="x",
body=None,
assignee="coder",
status="ready",
priority=0,
created_by=None,
created_at=0,
started_at=None,
completed_at=None,
workspace_kind="scratch",
workspace_path=None,
claim_lock=None,
claim_expires=None,
tenant=None,
)
kb._default_spawn(task, str(tmp_path / "ws"))
env = captured["env"]
assert env["HERMES_KANBAN_DB"] == str(default_home / "kanban.db")
assert env["HERMES_KANBAN_WORKSPACES_ROOT"] == str(
default_home / "kanban" / "workspaces"
)
assert env["HERMES_KANBAN_TASK"] == "t_dispatch_env"