fix(kanban): restrict board routing tools to orchestrators

Adapted from PR #20568 commit ce3518578 (Eric Litovsky / @kallidean).
Adds two-tier gating for the kanban tool surface so dispatcher-spawned
workers see only task-lifecycle tools (show/complete/block/heartbeat/
comment/create/link) while orchestrator profiles with `toolsets: [kanban]`
also see board-routing tools (kanban_list, kanban_unblock).

Workers shouldn't be enumerating or unblocking the board — they should
close their own task via the lifecycle tools. Hiding board-routing tools
from worker schemas keeps the worker focused and the toolset-isolation
contract honest.

Plus inherited from the same upstream commit:
- 50/200 row bound on kanban_list with `truncated` + `next_limit` metadata.
- Belt-and-suspenders runtime guard `_require_orchestrator_tool()` inside
  the orchestrator handlers in case a stale registration ever routes a
  worker to one of them.
- Tests for the new gate, the stricter bound, and the fact that even a
  worker with `toolsets: [kanban]` in config still doesn't see board
  routing.

Co-authored-by: Eric Litovsky <elitovsky@zenproject.net>
This commit is contained in:
Teknium 2026-05-09 23:05:06 -07:00
parent 50d281495e
commit 2704e7b67e
2 changed files with 169 additions and 62 deletions

View file

@ -40,7 +40,7 @@ def test_kanban_tools_hidden_without_env_var(monkeypatch, tmp_path):
def test_kanban_tools_visible_with_env_var(monkeypatch, tmp_path):
"""Worker sessions (HERMES_KANBAN_TASK set) must have kanban tools."""
"""Worker sessions get task lifecycle tools, not board-routing tools."""
monkeypatch.setenv("HERMES_KANBAN_TASK", "t_fake")
home = tmp_path / ".hermes"
home.mkdir()
@ -50,6 +50,59 @@ def test_kanban_tools_visible_with_env_var(monkeypatch, tmp_path):
from tools.registry import invalidate_check_fn_cache, registry
from toolsets import resolve_toolset
invalidate_check_fn_cache()
schema = registry.get_definitions(set(resolve_toolset("hermes-cli")), quiet=True)
names = {s["function"].get("name") for s in schema if "function" in s}
kanban = {n for n in names if n and n.startswith("kanban_")}
expected = {
"kanban_show", "kanban_complete", "kanban_block", "kanban_heartbeat",
"kanban_comment", "kanban_create", "kanban_link",
}
assert kanban == expected, f"expected {expected}, got {kanban}"
def test_worker_with_kanban_toolset_still_hides_board_routing(monkeypatch, tmp_path):
"""Task scope wins over profile config for board-routing tools.
Even if a worker process happens to also have ``toolsets: [kanban]``
in its config, the HERMES_KANBAN_TASK env var means it's a focused
worker and must not see kanban_list / kanban_unblock.
"""
monkeypatch.setenv("HERMES_KANBAN_TASK", "t_fake")
home = tmp_path / ".hermes"
home.mkdir()
(home / "config.yaml").write_text("toolsets:\n - kanban\n")
monkeypatch.setenv("HERMES_HOME", str(home))
import tools.kanban_tools # ensure registered
from tools.registry import invalidate_check_fn_cache, registry
from toolsets import resolve_toolset
invalidate_check_fn_cache()
schema = registry.get_definitions(set(resolve_toolset("hermes-cli")), quiet=True)
names = {s["function"].get("name") for s in schema if "function" in s}
kanban = {n for n in names if n and n.startswith("kanban_")}
assert {
"kanban_list",
"kanban_unblock",
}.isdisjoint(kanban), (
f"Board-routing tools leaked into worker schema: "
f"{kanban & {'kanban_list', 'kanban_unblock'}}"
)
def test_kanban_tools_visible_with_toolset_config(monkeypatch, tmp_path):
"""Orchestrator profiles with toolsets: [kanban] see all kanban tools."""
monkeypatch.delenv("HERMES_KANBAN_TASK", raising=False)
home = tmp_path / ".hermes"
home.mkdir()
(home / "config.yaml").write_text("toolsets:\n - kanban\n")
monkeypatch.setenv("HERMES_HOME", str(home))
import tools.kanban_tools # ensure registered
from tools.registry import invalidate_check_fn_cache, registry
from toolsets import resolve_toolset
invalidate_check_fn_cache()
schema = registry.get_definitions(set(resolve_toolset("hermes-cli")), quiet=True)
names = {s["function"].get("name") for s in schema if "function" in s}
@ -63,28 +116,6 @@ def test_kanban_tools_visible_with_env_var(monkeypatch, tmp_path):
assert kanban == expected, f"expected {expected}, got {kanban}"
def test_kanban_tools_visible_with_toolset_config(monkeypatch, tmp_path):
"""Orchestrator profiles with toolsets: [kanban] see the same tools."""
monkeypatch.delenv("HERMES_KANBAN_TASK", raising=False)
home = tmp_path / ".hermes"
home.mkdir()
(home / "config.yaml").write_text("toolsets:\n - kanban\n")
monkeypatch.setenv("HERMES_HOME", str(home))
import tools.kanban_tools # ensure registered
from tools.registry import invalidate_check_fn_cache, registry
from toolsets import resolve_toolset
invalidate_check_fn_cache()
schema = registry.get_definitions(set(resolve_toolset("hermes-cli")), quiet=True)
names = {s["function"].get("name") for s in schema if "function" in s}
kanban = {n for n in names if n and n.startswith("kanban_")}
assert {
"kanban_list",
"kanban_unblock",
}.issubset(kanban)
# ---------------------------------------------------------------------------
# Handler happy paths
# ---------------------------------------------------------------------------
@ -138,8 +169,9 @@ def test_show_explicit_task_id(worker_env):
assert d["task"]["id"] == other
def test_list_filters_tasks(worker_env):
def test_list_filters_tasks(monkeypatch, worker_env):
"""kanban_list gives orchestrators filtered board discovery."""
monkeypatch.delenv("HERMES_KANBAN_TASK", raising=False)
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
@ -168,19 +200,22 @@ def test_list_filters_tasks(worker_env):
assert tenant_ids == [c]
def test_list_rejects_invalid_status(worker_env):
def test_list_rejects_invalid_status(monkeypatch, worker_env):
monkeypatch.delenv("HERMES_KANBAN_TASK", raising=False)
from tools import kanban_tools as kt
out = kt._handle_list({"status": "not-a-state"})
assert "status must be one of" in json.loads(out).get("error", "")
def test_list_rejects_bad_limit(worker_env):
def test_list_rejects_bad_limit(monkeypatch, worker_env):
monkeypatch.delenv("HERMES_KANBAN_TASK", raising=False)
from tools import kanban_tools as kt
assert json.loads(kt._handle_list({"limit": "nope"})).get("error")
assert json.loads(kt._handle_list({"limit": 0})).get("error")
def test_list_parses_include_archived_string_false(worker_env):
def test_list_parses_include_archived_string_false(monkeypatch, worker_env):
monkeypatch.delenv("HERMES_KANBAN_TASK", raising=False)
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
@ -200,7 +235,8 @@ def test_list_parses_include_archived_string_false(worker_env):
assert archived not in ids
def test_list_parses_include_archived_string_true(worker_env):
def test_list_parses_include_archived_string_true(monkeypatch, worker_env):
monkeypatch.delenv("HERMES_KANBAN_TASK", raising=False)
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
@ -220,7 +256,8 @@ def test_list_parses_include_archived_string_true(worker_env):
assert archived in ids
def test_list_rejects_bad_include_archived(worker_env):
def test_list_rejects_bad_include_archived(monkeypatch, worker_env):
monkeypatch.delenv("HERMES_KANBAN_TASK", raising=False)
from tools import kanban_tools as kt
out = kt._handle_list({"include_archived": "sometimes"})
assert "include_archived must be" in json.loads(out).get("error", "")
@ -641,7 +678,8 @@ def test_unblock_happy_path(monkeypatch, worker_env):
conn.close()
def test_unblock_rejects_non_blocked_task(worker_env):
def test_unblock_rejects_non_blocked_task(monkeypatch, worker_env):
monkeypatch.delenv("HERMES_KANBAN_TASK", raising=False)
from tools import kanban_tools as kt
out = kt._handle_unblock({"task_id": worker_env})
assert json.loads(out).get("error")
@ -899,7 +937,13 @@ def test_worker_can_comment_on_foreign_task(worker_env):
def test_worker_unblock_rejects_foreign_task_id(worker_env):
"""A worker cannot unblock a task that isn't its own."""
"""A worker cannot unblock any task — kanban_unblock is orchestrator-only.
The check fires before the per-task ownership check, so the error
surface is the orchestrator-only refusal rather than the
cross-task-ownership refusal. Either is fine the property we're
pinning is "worker cannot mutate foreign task via kanban_unblock".
"""
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
@ -911,7 +955,10 @@ def test_worker_unblock_rejects_foreign_task_id(worker_env):
from tools import kanban_tools as kt
out = kt._handle_unblock({"task_id": other})
d = json.loads(out)
assert "refusing to mutate" in d.get("error", "")
err = d.get("error", "")
assert "orchestrator-only" in err or "refusing to mutate" in err, (
f"expected worker-rejection error, got {err}"
)
conn = kb.connect()
try:

View file

@ -39,22 +39,11 @@ logger = logging.getLogger(__name__)
# Gating
# ---------------------------------------------------------------------------
def _check_kanban_mode() -> bool:
"""Tools are available when:
KANBAN_LIST_DEFAULT_LIMIT = 50
KANBAN_LIST_MAX_LIMIT = 200
1. ``HERMES_KANBAN_TASK`` is set (dispatcher-spawned worker), OR
2. The current profile has ``kanban`` in its toolsets config
(orchestrator profiles like techlead that route work via Kanban).
Humans running ``hermes chat`` without the kanban toolset see zero
kanban tools. Workers spawned by the kanban dispatcher (gateway-
embedded by default) and orchestrator profiles with the kanban
toolset enabled see the Kanban tool surface.
"""
if os.environ.get("HERMES_KANBAN_TASK"):
return True
# Check if the current profile has the kanban toolset enabled.
def _profile_has_kanban_toolset() -> bool:
# Uses load_config() which has mtime-based caching, so this adds
# negligible overhead. The check_fn results are further TTL-cached
# (~30s) by the tool registry.
@ -67,6 +56,37 @@ def _check_kanban_mode() -> bool:
return False
def _check_kanban_mode() -> bool:
"""Task-lifecycle tools are available when:
1. ``HERMES_KANBAN_TASK`` is set (dispatcher-spawned worker), OR
2. The current profile has ``kanban`` in its toolsets config
(orchestrator profiles like techlead that route work via Kanban).
Humans running ``hermes chat`` without the kanban toolset see zero
kanban tools. Workers spawned by the kanban dispatcher (gateway-
embedded by default) and orchestrator profiles with the kanban
toolset enabled see the Kanban lifecycle tool surface.
"""
if os.environ.get("HERMES_KANBAN_TASK"):
return True
return _profile_has_kanban_toolset()
def _check_kanban_orchestrator_mode() -> bool:
"""Board-routing tools (kanban_list, kanban_unblock) are intentionally
hidden from task workers.
Dispatcher-spawned workers should close their own task via the
lifecycle tools (complete/block/heartbeat), not enumerate or unblock
board state. Profiles that explicitly opt into the kanban toolset
and are NOT scoped to a single task are the orchestrator surface.
"""
if os.environ.get("HERMES_KANBAN_TASK"):
return False
return _profile_has_kanban_toolset()
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
@ -159,6 +179,24 @@ def _parse_bool_arg(args: dict, name: str, *, default: bool = False):
return default, f"{name} must be a boolean or 'true'/'false'"
def _require_orchestrator_tool(tool_name: str) -> Optional[str]:
"""Belt-and-suspenders runtime guard for orchestrator-only handlers.
The check_fn (`_check_kanban_orchestrator_mode`) keeps these tools
out of the worker schema entirely, but in case a stale registration
or test harness routes a worker to one of them anyway, return a
structured tool_error so the model gets a clear refusal instead of
silently mutating board state from a worker context.
"""
if os.environ.get("HERMES_KANBAN_TASK"):
return tool_error(
f"{tool_name} is orchestrator-only; dispatcher-spawned workers "
"must use kanban_complete, kanban_block, kanban_heartbeat, or "
"kanban_comment for their assigned task."
)
return None
def _task_summary_dict(kb, conn, task) -> dict[str, Any]:
"""Compact task shape for board-listing tools."""
parents = kb.parent_ids(conn, task.id)
@ -261,6 +299,9 @@ def _handle_show(args: dict, **kw) -> str:
def _handle_list(args: dict, **kw) -> str:
"""List task summaries with the same core filters as the CLI."""
guard = _require_orchestrator_tool("kanban_list")
if guard:
return guard
assignee = args.get("assignee")
status = args.get("status")
tenant = args.get("tenant")
@ -268,30 +309,43 @@ def _handle_list(args: dict, **kw) -> str:
if bool_error:
return tool_error(bool_error)
limit = args.get("limit")
if limit is not None:
try:
limit = int(limit)
except (TypeError, ValueError):
return tool_error("limit must be an integer")
if limit < 1:
return tool_error("limit must be >= 1")
if limit is None:
limit = KANBAN_LIST_DEFAULT_LIMIT
try:
limit = int(limit)
except (TypeError, ValueError):
return tool_error("limit must be an integer")
if limit < 1:
return tool_error("limit must be >= 1")
if limit > KANBAN_LIST_MAX_LIMIT:
return tool_error(f"limit must be <= {KANBAN_LIST_MAX_LIMIT}")
try:
kb, conn = _connect()
try:
# Match CLI list: dependencies that cleared since the last
# dispatcher tick should be visible to orchestrators immediately.
promoted = kb.recompute_ready(conn)
tasks = kb.list_tasks(
# Fetch one extra row so model-facing output can report that
# a bounded listing was truncated without dumping the board.
rows = kb.list_tasks(
conn,
assignee=assignee,
status=status,
tenant=tenant,
include_archived=include_archived,
limit=limit,
limit=limit + 1,
)
truncated = len(rows) > limit
tasks = rows[:limit]
return json.dumps({
"tasks": [_task_summary_dict(kb, conn, t) for t in tasks],
"count": len(tasks),
"limit": limit,
"truncated": truncated,
"next_limit": (
min(limit * 2, KANBAN_LIST_MAX_LIMIT)
if truncated and limit < KANBAN_LIST_MAX_LIMIT else None
),
"promoted": promoted,
})
finally:
@ -564,6 +618,9 @@ def _handle_create(args: dict, **kw) -> str:
def _handle_unblock(args: dict, **kw) -> str:
"""Transition a blocked task back to ready."""
guard = _require_orchestrator_tool("kanban_unblock")
if guard:
return guard
tid = args.get("task_id")
if not tid:
return tool_error("task_id is required")
@ -643,7 +700,10 @@ KANBAN_LIST_SCHEMA = {
"work to route. Supports the same core filters as the CLI: assignee, "
"status, tenant, include_archived, and limit. Returns compact rows "
"with ids, title, status, assignee, priority, parent/child ids, and "
"counts. Also recomputes ready tasks before listing, matching the CLI."
"counts. Bounded to 50 rows by default, 200 max, with truncation "
"metadata. Also recomputes ready tasks before listing, matching the "
"CLI. Orchestrator-only — dispatcher-spawned task workers never see "
"this tool."
),
"parameters": {
"type": "object",
@ -670,7 +730,7 @@ KANBAN_LIST_SCHEMA = {
},
"limit": {
"type": "integer",
"description": "Optional maximum number of tasks to return.",
"description": "Optional maximum rows to return (default 50, max 200).",
},
},
"required": [],
@ -948,9 +1008,9 @@ KANBAN_CREATE_SCHEMA = {
KANBAN_UNBLOCK_SCHEMA = {
"name": "kanban_unblock",
"description": (
"Move a blocked Kanban task back to ready. Dispatcher-spawned "
"workers may only unblock their own task; orchestrator profiles "
"with the kanban toolset can unblock routed work."
"Move a blocked Kanban task back to ready. Orchestrator-only — only "
"profiles with the kanban toolset can unblock routed work; "
"dispatcher-spawned task workers never see this tool."
),
"parameters": {
"type": "object",
@ -1000,7 +1060,7 @@ registry.register(
toolset="kanban",
schema=KANBAN_LIST_SCHEMA,
handler=_handle_list,
check_fn=_check_kanban_mode,
check_fn=_check_kanban_orchestrator_mode,
emoji="📋",
)
@ -1054,7 +1114,7 @@ registry.register(
toolset="kanban",
schema=KANBAN_UNBLOCK_SCHEMA,
handler=_handle_unblock,
check_fn=_check_kanban_mode,
check_fn=_check_kanban_orchestrator_mode,
emoji="",
)