hermes-agent/tools/kanban_tools.py
pdonizete deb59eab72 fix: allow kanban tools for orchestrator profiles with kanban toolset
The _check_kanban_mode() gating function only checked for
HERMES_KANBAN_TASK env var, which is only set by the dispatcher
when spawning workers. This prevented orchestrator profiles (like
techlead) from using kanban_create, kanban_link, etc. even when
they had 'kanban' explicitly in their toolsets config.

Now uses load_config() from hermes_cli.config (which has mtime-based
caching) to check if 'kanban' is in the profile's toolsets list.
This enables orchestrators to route work via Kanban while workers
continue using the dispatcher env var.

Fixes #18968
2026-05-04 02:00:42 -07:00

744 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Kanban tools — structured tool-call surface for worker + orchestrator agents.
These tools are only registered into the model's schema when the agent is
running under the dispatcher (env var ``HERMES_KANBAN_TASK`` set). A
normal ``hermes chat`` session sees **zero** kanban tools in its schema.
Why tools instead of just shelling out to ``hermes kanban``?
1. **Backend portability.** A worker whose terminal tool points at Docker
/ Modal / Singularity / SSH would run ``hermes kanban complete …``
inside the container, where ``hermes`` isn't installed and the DB
isn't mounted. Tools run in the agent's Python process, so they
always reach ``~/.hermes/kanban.db`` regardless of terminal backend.
2. **No shell-quoting footguns.** Passing ``--metadata '{"x": [...]}'``
through shlex+argparse is fragile. Structured tool args skip it.
3. **Better errors.** Tool-call failures return structured JSON the
model can reason about, not stderr strings it has to parse.
Humans continue to use the CLI (``hermes kanban …``), the dashboard
(``hermes dashboard``), and the slash command (``/kanban …``) — all
three bypass the agent entirely. The tools are ONLY for the worker
agent's handoff back to the kernel.
"""
from __future__ import annotations
import json
import logging
import os
from typing import Any, Optional
from tools.registry import registry, tool_error
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Gating
# ---------------------------------------------------------------------------
def _check_kanban_mode() -> bool:
"""Tools are available when:
1. ``HERMES_KANBAN_TASK`` is set (dispatcher-spawned worker), OR
2. The current profile has ``kanban`` in its toolsets config
(orchestrator profiles like techlead that route work via Kanban).
Humans running ``hermes chat`` without the kanban toolset see zero
kanban tools. Workers spawned by the kanban dispatcher (gateway-
embedded by default) and orchestrator profiles with the kanban
toolset enabled see all seven.
"""
if os.environ.get("HERMES_KANBAN_TASK"):
return True
# Check if the current profile has the kanban toolset enabled.
# Uses load_config() which has mtime-based caching, so this adds
# negligible overhead. The check_fn results are further TTL-cached
# (~30s) by the tool registry.
try:
from hermes_cli.config import load_config
cfg = load_config()
toolsets = cfg.get("toolsets", [])
return "kanban" in toolsets
except Exception:
return False
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
def _default_task_id(arg: Optional[str]) -> Optional[str]:
"""Resolve ``task_id`` arg or fall back to the env var the dispatcher set."""
if arg:
return arg
env_tid = os.environ.get("HERMES_KANBAN_TASK")
return env_tid or None
def _connect():
"""Import + connect lazily so the module imports cleanly in non-kanban
contexts (e.g. test rigs that import every tool module)."""
from hermes_cli import kanban_db as kb
return kb, kb.connect()
def _ok(**fields: Any) -> str:
return json.dumps({"ok": True, **fields})
# ---------------------------------------------------------------------------
# Handlers
# ---------------------------------------------------------------------------
def _handle_show(args: dict, **kw) -> str:
"""Read a task's full state: task row, parents, children, comments,
runs (attempt history), and the last N events."""
tid = _default_task_id(args.get("task_id"))
if not tid:
return tool_error(
"task_id is required (or set HERMES_KANBAN_TASK in the env)"
)
try:
kb, conn = _connect()
try:
task = kb.get_task(conn, tid)
if task is None:
return tool_error(f"task {tid} not found")
comments = kb.list_comments(conn, tid)
events = kb.list_events(conn, tid)
runs = kb.list_runs(conn, tid)
parents = kb.parent_ids(conn, tid)
children = kb.child_ids(conn, tid)
def _task_dict(t):
return {
"id": t.id, "title": t.title, "body": t.body,
"assignee": t.assignee, "status": t.status,
"tenant": t.tenant, "priority": t.priority,
"workspace_kind": t.workspace_kind,
"workspace_path": t.workspace_path,
"created_by": t.created_by, "created_at": t.created_at,
"started_at": t.started_at,
"completed_at": t.completed_at,
"result": t.result,
"current_run_id": t.current_run_id,
}
def _run_dict(r):
return {
"id": r.id, "profile": r.profile,
"status": r.status, "outcome": r.outcome,
"summary": r.summary, "error": r.error,
"metadata": r.metadata,
"started_at": r.started_at, "ended_at": r.ended_at,
}
return json.dumps({
"task": _task_dict(task),
"parents": parents,
"children": children,
"comments": [
{"author": c.author, "body": c.body,
"created_at": c.created_at}
for c in comments
],
"events": [
{"kind": e.kind, "payload": e.payload,
"created_at": e.created_at, "run_id": e.run_id}
for e in events[-50:] # cap; full log via CLI
],
"runs": [_run_dict(r) for r in runs],
# Also surface the worker's own context block so the
# agent can include it directly if it wants. This is
# the same string build_worker_context returns to the
# dispatcher at spawn time.
"worker_context": kb.build_worker_context(conn, tid),
})
finally:
conn.close()
except Exception as e:
logger.exception("kanban_show failed")
return tool_error(f"kanban_show: {e}")
def _handle_complete(args: dict, **kw) -> str:
"""Mark the current task done with a structured handoff."""
tid = _default_task_id(args.get("task_id"))
if not tid:
return tool_error(
"task_id is required (or set HERMES_KANBAN_TASK in the env)"
)
summary = args.get("summary")
metadata = args.get("metadata")
result = args.get("result")
if not (summary or result):
return tool_error(
"provide at least one of: summary (preferred), result"
)
if metadata is not None and not isinstance(metadata, dict):
return tool_error(
f"metadata must be an object/dict, got {type(metadata).__name__}"
)
try:
kb, conn = _connect()
try:
ok = kb.complete_task(
conn, tid,
result=result, summary=summary, metadata=metadata,
)
if not ok:
return tool_error(
f"could not complete {tid} (unknown id or already terminal)"
)
run = kb.latest_run(conn, tid)
return _ok(task_id=tid, run_id=run.id if run else None)
finally:
conn.close()
except Exception as e:
logger.exception("kanban_complete failed")
return tool_error(f"kanban_complete: {e}")
def _handle_block(args: dict, **kw) -> str:
"""Transition the task to blocked with a reason a human will read."""
tid = _default_task_id(args.get("task_id"))
if not tid:
return tool_error(
"task_id is required (or set HERMES_KANBAN_TASK in the env)"
)
reason = args.get("reason")
if not reason or not str(reason).strip():
return tool_error("reason is required — explain what input you need")
try:
kb, conn = _connect()
try:
ok = kb.block_task(conn, tid, reason=reason)
if not ok:
return tool_error(
f"could not block {tid} (unknown id or not in "
f"running/ready)"
)
run = kb.latest_run(conn, tid)
return _ok(task_id=tid, run_id=run.id if run else None)
finally:
conn.close()
except Exception as e:
logger.exception("kanban_block failed")
return tool_error(f"kanban_block: {e}")
def _handle_heartbeat(args: dict, **kw) -> str:
"""Signal that the worker is still alive during a long operation."""
tid = _default_task_id(args.get("task_id"))
if not tid:
return tool_error(
"task_id is required (or set HERMES_KANBAN_TASK in the env)"
)
note = args.get("note")
try:
kb, conn = _connect()
try:
ok = kb.heartbeat_worker(conn, tid, note=note)
if not ok:
return tool_error(
f"could not heartbeat {tid} (unknown id or not running)"
)
return _ok(task_id=tid)
finally:
conn.close()
except Exception as e:
logger.exception("kanban_heartbeat failed")
return tool_error(f"kanban_heartbeat: {e}")
def _handle_comment(args: dict, **kw) -> str:
"""Append a comment to a task's thread."""
tid = args.get("task_id")
if not tid:
return tool_error(
"task_id is required (use the current task id if that's what "
"you mean — pulls from env but kept explicit here)"
)
body = args.get("body")
if not body or not str(body).strip():
return tool_error("body is required")
author = args.get("author") or os.environ.get("HERMES_PROFILE") or "worker"
try:
kb, conn = _connect()
try:
cid = kb.add_comment(conn, tid, author=author, body=str(body))
return _ok(task_id=tid, comment_id=cid)
finally:
conn.close()
except Exception as e:
logger.exception("kanban_comment failed")
return tool_error(f"kanban_comment: {e}")
def _handle_create(args: dict, **kw) -> str:
"""Create a child task. Orchestrator workers use this to fan out.
``parents`` can be a list of task ids; dependency-gated promotion
works as usual.
"""
title = args.get("title")
if not title or not str(title).strip():
return tool_error("title is required")
assignee = args.get("assignee")
if not assignee:
return tool_error(
"assignee is required — name the profile that should execute this "
"task (the dispatcher will only spawn tasks with an assignee)"
)
body = args.get("body")
parents = args.get("parents") or []
tenant = args.get("tenant") or os.environ.get("HERMES_TENANT")
priority = args.get("priority")
workspace_kind = args.get("workspace_kind") or "scratch"
workspace_path = args.get("workspace_path")
triage = bool(args.get("triage"))
idempotency_key = args.get("idempotency_key")
max_runtime_seconds = args.get("max_runtime_seconds")
skills = args.get("skills")
if isinstance(skills, str):
# Accept a single skill name as a string for convenience.
skills = [skills]
if skills is not None and not isinstance(skills, (list, tuple)):
return tool_error(
f"skills must be a list of skill names, got {type(skills).__name__}"
)
if isinstance(parents, str):
parents = [parents]
if not isinstance(parents, (list, tuple)):
return tool_error(
f"parents must be a list of task ids, got {type(parents).__name__}"
)
try:
kb, conn = _connect()
try:
new_tid = kb.create_task(
conn,
title=str(title).strip(),
body=body,
assignee=str(assignee),
parents=tuple(parents),
tenant=tenant,
priority=int(priority) if priority is not None else 0,
workspace_kind=str(workspace_kind),
workspace_path=workspace_path,
triage=triage,
idempotency_key=idempotency_key,
max_runtime_seconds=(
int(max_runtime_seconds)
if max_runtime_seconds is not None else None
),
skills=skills,
created_by=os.environ.get("HERMES_PROFILE") or "worker",
)
new_task = kb.get_task(conn, new_tid)
return _ok(
task_id=new_tid,
status=new_task.status if new_task else None,
)
finally:
conn.close()
except Exception as e:
logger.exception("kanban_create failed")
return tool_error(f"kanban_create: {e}")
def _handle_link(args: dict, **kw) -> str:
"""Add a parent→child dependency edge after the fact."""
parent_id = args.get("parent_id")
child_id = args.get("child_id")
if not parent_id or not child_id:
return tool_error("both parent_id and child_id are required")
try:
kb, conn = _connect()
try:
kb.link_tasks(conn, parent_id=parent_id, child_id=child_id)
return _ok(parent_id=parent_id, child_id=child_id)
finally:
conn.close()
except ValueError as e:
# Covers cycle + self-parent rejections
return tool_error(f"kanban_link: {e}")
except Exception as e:
logger.exception("kanban_link failed")
return tool_error(f"kanban_link: {e}")
# ---------------------------------------------------------------------------
# Schemas
# ---------------------------------------------------------------------------
_DESC_TASK_ID_DEFAULT = (
"Task id. If omitted, defaults to HERMES_KANBAN_TASK from the env "
"(the task the dispatcher spawned you to work on)."
)
KANBAN_SHOW_SCHEMA = {
"name": "kanban_show",
"description": (
"Read a task's full state — title, body, assignee, parent task "
"handoffs, your prior attempts on this task if any, comments, "
"and recent events. Use this to (re)orient yourself before "
"starting work, especially on retries. The response includes a "
"pre-formatted ``worker_context`` string suitable for inclusion "
"verbatim in your reasoning."
),
"parameters": {
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": _DESC_TASK_ID_DEFAULT,
},
},
"required": [],
},
}
KANBAN_COMPLETE_SCHEMA = {
"name": "kanban_complete",
"description": (
"Mark your current task done with a structured handoff for "
"downstream workers and humans. Prefer ``summary`` for a "
"human-readable 1-3 sentence description of what you did; put "
"machine-readable facts in ``metadata`` (changed_files, "
"tests_run, decisions, findings, etc). At least one of "
"``summary`` or ``result`` is required."
),
"parameters": {
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": _DESC_TASK_ID_DEFAULT,
},
"summary": {
"type": "string",
"description": (
"Human-readable handoff, 1-3 sentences. Appears in "
"Run History on the dashboard and in downstream "
"workers' context."
),
},
"metadata": {
"type": "object",
"description": (
"Free-form dict of structured facts about this "
"attempt — {\"changed_files\": [...], \"tests_run\": 12, "
"\"findings\": [...]}. Surfaced to downstream "
"workers alongside ``summary``."
),
},
"result": {
"type": "string",
"description": (
"Short result log line (legacy field, maps to "
"task.result). Use ``summary`` instead when "
"possible; this exists for compatibility with "
"callers that still set --result on the CLI."
),
},
},
"required": [],
},
}
KANBAN_BLOCK_SCHEMA = {
"name": "kanban_block",
"description": (
"Transition the task to blocked because you need human input "
"to proceed. ``reason`` will be shown to the human on the "
"board and included in context when someone unblocks you. "
"Use for genuine blockers only — don't block on things you can "
"resolve yourself."
),
"parameters": {
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": _DESC_TASK_ID_DEFAULT,
},
"reason": {
"type": "string",
"description": (
"What you need answered, in one or two sentences. "
"Don't paste the whole conversation; the human has "
"the board and can ask follow-ups via comments."
),
},
},
"required": ["reason"],
},
}
KANBAN_HEARTBEAT_SCHEMA = {
"name": "kanban_heartbeat",
"description": (
"Signal that you're still alive during a long operation "
"(training, encoding, large crawls). Call every few minutes so "
"humans see liveness separately from PID checks. Pure side "
"effect — no work changes."
),
"parameters": {
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": _DESC_TASK_ID_DEFAULT,
},
"note": {
"type": "string",
"description": (
"Optional short note describing current progress. "
"Shown in the event log."
),
},
},
"required": [],
},
}
KANBAN_COMMENT_SCHEMA = {
"name": "kanban_comment",
"description": (
"Append a comment to a task's thread. Use for durable notes "
"that should outlive this run (questions for the next worker, "
"partial findings, rationale). Ephemeral reasoning doesn't "
"belong here — use your normal response instead."
),
"parameters": {
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": (
"Task id. Required (may be your own task or "
"another's — comment threads are per-task)."
),
},
"body": {
"type": "string",
"description": "Markdown-supported comment body.",
},
"author": {
"type": "string",
"description": (
"Override author name. Defaults to the current "
"profile (HERMES_PROFILE env)."
),
},
},
"required": ["task_id", "body"],
},
}
KANBAN_CREATE_SCHEMA = {
"name": "kanban_create",
"description": (
"Create a new kanban task, optionally as a child of the current "
"one (pass the current task id in ``parents``). Used by "
"orchestrator workers to fan out — decompose work into child "
"tasks with specific assignees, link them into a pipeline, "
"then complete your own task. The dispatcher picks up the new "
"tasks on its next tick and spawns the assigned profiles."
),
"parameters": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "Short task title (required).",
},
"assignee": {
"type": "string",
"description": (
"Profile name that should execute this task "
"(e.g. 'researcher-a', 'reviewer', 'writer'). "
"Required — tasks without an assignee are never "
"dispatched."
),
},
"body": {
"type": "string",
"description": (
"Opening post: full spec, acceptance criteria, "
"links. The assigned worker reads this as part of "
"its context."
),
},
"parents": {
"type": "array",
"items": {"type": "string"},
"description": (
"Parent task ids. The new task stays in 'todo' "
"until every parent reaches 'done'; then it "
"auto-promotes to 'ready'. Typical fan-in: list "
"all the researcher task ids when creating a "
"synthesizer task."
),
},
"tenant": {
"type": "string",
"description": (
"Optional namespace for multi-project isolation. "
"Defaults to HERMES_TENANT env if set."
),
},
"priority": {
"type": "integer",
"description": (
"Dispatcher tiebreaker. Higher = picked sooner "
"when multiple ready tasks share an assignee."
),
},
"workspace_kind": {
"type": "string",
"enum": ["scratch", "dir", "worktree"],
"description": (
"Workspace flavor: 'scratch' (fresh tmp dir, "
"default), 'dir' (shared directory, requires "
"absolute workspace_path), 'worktree' (git worktree)."
),
},
"workspace_path": {
"type": "string",
"description": (
"Absolute path for 'dir' or 'worktree' workspace. "
"Relative paths are rejected at dispatch."
),
},
"triage": {
"type": "boolean",
"description": (
"If true, task lands in 'triage' instead of 'todo' "
"— a specifier profile is expected to flesh out "
"the body before work starts."
),
},
"idempotency_key": {
"type": "string",
"description": (
"If a non-archived task with this key already "
"exists, return that task's id instead of creating "
"a duplicate. Useful for retry-safe automation."
),
},
"max_runtime_seconds": {
"type": "integer",
"description": (
"Per-task runtime cap. When exceeded, the "
"dispatcher SIGTERMs the worker and re-queues the "
"task with outcome='timed_out'."
),
},
"skills": {
"type": "array",
"items": {"type": "string"},
"description": (
"Skill names to force-load into the dispatched "
"worker (in addition to the built-in kanban-worker "
"skill). Use this to pin a task to a specialist "
"context — e.g. ['translation'] for a translation "
"task, ['github-code-review'] for a reviewer task. "
"The names must match skills installed on the "
"assignee's profile."
),
},
},
"required": ["title", "assignee"],
},
}
KANBAN_LINK_SCHEMA = {
"name": "kanban_link",
"description": (
"Add a parent→child dependency edge after both tasks already "
"exist. The child won't promote to 'ready' until all parents "
"are 'done'. Cycles and self-links are rejected."
),
"parameters": {
"type": "object",
"properties": {
"parent_id": {"type": "string", "description": "Parent task id."},
"child_id": {"type": "string", "description": "Child task id."},
},
"required": ["parent_id", "child_id"],
},
}
# ---------------------------------------------------------------------------
# Registration
# ---------------------------------------------------------------------------
registry.register(
name="kanban_show",
toolset="kanban",
schema=KANBAN_SHOW_SCHEMA,
handler=_handle_show,
check_fn=_check_kanban_mode,
emoji="📋",
)
registry.register(
name="kanban_complete",
toolset="kanban",
schema=KANBAN_COMPLETE_SCHEMA,
handler=_handle_complete,
check_fn=_check_kanban_mode,
emoji="",
)
registry.register(
name="kanban_block",
toolset="kanban",
schema=KANBAN_BLOCK_SCHEMA,
handler=_handle_block,
check_fn=_check_kanban_mode,
emoji="",
)
registry.register(
name="kanban_heartbeat",
toolset="kanban",
schema=KANBAN_HEARTBEAT_SCHEMA,
handler=_handle_heartbeat,
check_fn=_check_kanban_mode,
emoji="💓",
)
registry.register(
name="kanban_comment",
toolset="kanban",
schema=KANBAN_COMMENT_SCHEMA,
handler=_handle_comment,
check_fn=_check_kanban_mode,
emoji="💬",
)
registry.register(
name="kanban_create",
toolset="kanban",
schema=KANBAN_CREATE_SCHEMA,
handler=_handle_create,
check_fn=_check_kanban_mode,
emoji="",
)
registry.register(
name="kanban_link",
toolset="kanban",
schema=KANBAN_LINK_SCHEMA,
handler=_handle_link,
check_fn=_check_kanban_mode,
emoji="🔗",
)