feat(kanban): orchestrator-driven auto-decomposition on triage (#27572)

* feat(kanban): orchestrator-driven auto-decomposition on triage

Closes the core gap in the kanban system: dropping a one-liner into Triage
now decomposes it into a graph of child tasks routed to specialist
profiles by description, matching teknium's original vision ("main
orchestrator splits/creates actual tasks, doles them out to each agent").

The build
---------
- hermes_cli/profiles.py: new `description` + `description_auto` fields
  on ProfileInfo, persisted in <profile_dir>/profile.yaml. Helpers
  read_profile_meta / write_profile_meta. `create_profile` accepts
  optional description.
- hermes_cli/profile_describer.py: new module — auto-generate a 1-2
  sentence description from a profile's skills + model + name via the
  auxiliary LLM (`auxiliary.profile_describer`).
- hermes_cli/main.py: new `hermes profile create --description ...`
  flag; new `hermes profile describe [name] [--text ... | --auto |
  --all --auto]` subcommand.
- hermes_cli/kanban_db.py: new `decompose_triage_task` atomic helper —
  creates N child tasks, links the root as a child of every leaf
  (root waits for the whole graph), flips root `triage -> todo` with
  orchestrator assignee, records an audit comment + `decomposed` event
  in a single write_txn.
- hermes_cli/kanban_decompose.py: new module — calls the auxiliary LLM
  (`auxiliary.kanban_decomposer`) with the profile roster + descriptions
  to produce a JSON task graph, then invokes the DB helper. Rewrites
  unknown assignees to the configured `kanban.default_assignee` (or
  the active default profile) so a task NEVER lands with assignee=None.
  Falls back to specify-style single-task promotion when the LLM
  returns `fanout: false`.
- hermes_cli/kanban.py: new `hermes kanban decompose [task_id | --all]`
  CLI verb.
- hermes_cli/config.py: new DEFAULT_CONFIG keys —
  kanban.orchestrator_profile, kanban.default_assignee,
  kanban.auto_decompose (default True), kanban.auto_decompose_per_tick
  (default 3), auxiliary.kanban_decomposer, auxiliary.profile_describer.
- gateway/run.py: kanban dispatcher watcher now runs auto-decompose
  before each `_tick_once`, capped by `auto_decompose_per_tick` so a
  bulk-load of triage tasks doesn't burst-spend the aux LLM.
- plugins/kanban/dashboard/plugin_api.py: new endpoints —
  GET /profiles (list roster + descriptions),
  PATCH /profiles/<name> (set description, user-authored),
  POST /profiles/<name>/describe-auto (LLM-generate),
  POST /tasks/<id>/decompose (run decomposer),
  GET/PUT /orchestration (orchestrator/default-assignee/auto-decompose
  pickers, with resolved fallbacks echoed back).
- plugins/kanban/dashboard/dist/index.js: new OrchestrationPanel
  collapsible — dropdowns for orchestrator profile and default
  assignee, auto-decompose toggle, per-profile description editor with
  Save and Auto-generate buttons. New ⚗ Decompose button next to
   Specify on triage-column task drawers.

Behavior
--------
- A task in Triage gets fanned out into a small DAG of child tasks.
  Children with no internal parents flip to `ready` immediately
  (parallel dispatch). Children with sibling parents wait. The root
  stays alive as a parent of every child — when the whole graph
  finishes, it promotes to `ready` and the orchestrator profile wakes
  back up to judge completion (the "adds more tasks until done" part
  of the original vision).
- `kanban.orchestrator_profile` unset -> falls back to the default
  profile (whichever `hermes` launches with no -p flag).
- `kanban.default_assignee` unset -> same fallback. Tasks NEVER end
  up unassigned.
- `kanban.auto_decompose=true` (default) runs the decomposer
  automatically on dispatcher ticks; manual `hermes kanban decompose`
  is always available.

Tests
-----
- tests/hermes_cli/test_kanban_decompose_db.py — 7 tests for the
  atomic DB helper (status transitions, dep graph, audit trail,
  validation errors).
- tests/hermes_cli/test_kanban_decompose.py — 6 tests for the
  decomposer module (fanout, no-fanout fallback, unknown-assignee
  rewrite, malformed-JSON resilience, no-aux-client path).
- tests/hermes_cli/test_profile_describer.py — 10 tests for
  profile.yaml r/w + the LLM auto-describer (yaml corrupt tolerance,
  user-vs-auto description protection, --overwrite, fallback parsing).

E2E
---
- CLI end-to-end: created profiles with descriptions, dropped a triage
  task, mocked the aux LLM with a 3-task graph -> verified all three
  children were created with the right assignees, the dependency
  edges matched the LLM's graph, root flipped to todo gated by every
  child, audit comment + `decomposed` event recorded.
- Dashboard end-to-end: started the dashboard against an isolated
  HERMES_HOME, verified all four new endpoints via curl (profile
  listing, PATCH for description, PUT for orchestration settings,
  POST for decompose). Opened the UI in the browser, confirmed the
  OrchestrationPanel renders with all three pickers + the per-profile
  description editor, typed a description, clicked Save, verified
  ~/.hermes/profile.yaml was written. Clicked Decompose on the triage
  card and confirmed the inline error message surfaced as designed
  ("no auxiliary client configured").

* feat(kanban): surface decompose mode (Auto/Manual) as a one-click pill

The auto/manual toggle already existed as kanban.auto_decompose (default
true), but it was buried inside the collapsed Orchestration settings
panel — users couldn't tell at a glance which mode they were in. This
hoists it to a pill at the top of the kanban page so the state is always
visible and one click flips it.

UX
- New "⚗ Decompose: AUTO|MANUAL" pill in the kanban header. Emerald
  styling when Auto is on (the default), muted/gray when Manual.
- Pill is visible both in the collapsed AND expanded Orchestration
  settings views so context is preserved when the user opens the panel.
- Tooltip explains both states + what clicking does.
- Renamed the in-panel "Auto-decompose on triage / Enabled" checkbox
  to "Decompose mode / Auto (default) | Manual" for language parity
  with the pill.

Behavior preserved
- Default remains Auto (kanban.auto_decompose=true).
- Manual mode restores pre-PR behavior: triage tasks stay in triage
  until the user clicks ⚗ Decompose on each card (or runs
  `hermes kanban decompose <id>`).

Implementation
- plugins/kanban/dashboard/dist/index.js: load /orchestration on mount
  (not just on expand) so the collapsed pill reflects real state.
  Render mode pill in both collapsed and expanded headers. Reuses the
  existing PUT /api/plugins/kanban/orchestration endpoint — no new
  backend, no new tests required.

E2E verified
- Pill renders as "⚗ Decompose: AUTO" on page load (default).
- One click flips to "⚗ Decompose: MANUAL" with muted styling.
- config.yaml on disk shows auto_decompose: false after the flip.
- Second click round-trips back to Auto; config.yaml flips to true.

* feat(kanban): rename mode pill to "Orchestration: Auto/Manual"

Per Teknium feedback — "Decompose" was too implementation-specific.
"Orchestration" is the user-facing concept (the whole pitch is the
orchestrator profile routing work), and the pill is the front door to it.

- Pill text: "Orchestration: Auto" / "Orchestration: Manual" (title case,
  no ⚗ prefix, no SHOUTY-CAPS for the mode value)
- In-panel checkbox label: "Orchestration mode" (was "Decompose mode")
- Tooltips updated to match
- No behavior change

* docs(kanban): document decompose, profile descriptions, orchestration mode

Brings the docs site up to parity with the PR. English build verified
locally (npx docusaurus build --locale en) — clean, no new broken links
or anchors. Pre-existing broken-link warnings (rl-training, llms.txt,
step-by-step-checklist, fallback-model) untouched.

- website/docs/reference/cli-commands.md
    + `hermes kanban decompose` action row in the action table, with
      pointer to the Auto vs Manual orchestration section.

- website/docs/reference/profile-commands.md
    + `--description "<text>"` flag on `hermes profile create`.
    + Full `hermes profile describe` section: read, --text, --auto,
      --overwrite, --all flags with examples.

- website/docs/user-guide/features/kanban.md (the big one)
    + Triage column intro rewritten around the Auto-decompose default
      behavior, with pointer to the new Auto vs Manual section.
    + Status action row updated to mention both ⚗ Decompose and
       Specify on triage cards.
    + New "Auto vs Manual orchestration" section explaining the two
      modes, how to flip them (pill, config), how routing-by-description
      works, the no-None-assignee guarantee, plus a config knob table
      (auto_decompose, auto_decompose_per_tick, orchestrator_profile,
      default_assignee) and the two new auxiliary slots
      (kanban_decomposer, profile_describer).
    + REST surface table gains 6 new endpoint rows: /tasks/:id/decompose,
      /profiles (GET), /profiles/:name (PATCH), /profiles/:name/describe-auto,
      /orchestration (GET + PUT).

- website/docs/user-guide/features/kanban-tutorial.md
    + Triage column blurb updated for Auto by default + Manual via the
      pill, with cross-link to the Auto vs Manual orchestration section.

- website/docs/user-guide/profiles.md
    + Blank-profile flow now mentions --description and points to the
      kanban routing model for context.

- website/docs/user-guide/configuration.md
    + `kanban_decomposer` and `profile_describer` added to the
      `hermes model -> Configure auxiliary models` menu listing.
This commit is contained in:
Teknium 2026-05-17 13:54:12 -07:00 committed by GitHub
parent 04b4f765cc
commit 1345dda0cf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 2698 additions and 3 deletions

View file

@ -925,6 +925,31 @@ DEFAULT_CONFIG = {
"timeout": 120,
"extra_body": {},
},
# Kanban decomposer — decomposes a triage task into a graph of
# child tasks routed to specialist profiles by description.
# Invoked by ``hermes kanban decompose`` and the kanban
# auto-decompose dispatcher tick. Returns a JSON task graph;
# uses more tokens than the specifier so allow more headroom.
"kanban_decomposer": {
"provider": "auto",
"model": "",
"base_url": "",
"api_key": "",
"timeout": 180,
"extra_body": {},
},
# Profile describer — auto-generates a 1-2 sentence description
# of what a profile is good at. Invoked by
# ``hermes profile describe <name> --auto`` and the dashboard's
# auto-generate button. Short, cheap call.
"profile_describer": {
"provider": "auto",
"model": "",
"base_url": "",
"api_key": "",
"timeout": 60,
"extra_body": {},
},
# Curator — skill-usage review fork. Timeout is generous because the
# review pass can take several minutes on reasoning models (umbrella
# building over hundreds of candidate skills). "auto" = use main chat
@ -1466,6 +1491,25 @@ DEFAULT_CONFIG = {
# same task/profile (spawn_failed, timed_out, or crashed). Reassignment
# resets the streak for the new profile.
"failure_limit": 2,
# Profile that decomposes tasks in the Triage column. When unset,
# falls back to the default profile (the one `hermes` launches with
# no -p flag). Set this to a dedicated 'orchestrator' profile if you
# want decomposition to use a different model/skills from your main
# working profile.
"orchestrator_profile": "",
# Where a child task lands if the orchestrator can't match an
# assignee to any installed profile. When unset, falls back to the
# default profile. A task never ends up with assignee=None.
"default_assignee": "",
# When true, the kanban dispatcher auto-runs the decomposer on
# tasks that land in Triage (every dispatcher tick). When false,
# decomposition is manual via `hermes kanban decompose <id>` or
# the dashboard's Decompose button.
"auto_decompose": True,
# Max triage tasks to decompose per dispatcher tick. Prevents a
# large bulk-load of triage tasks from spending a burst of aux
# LLM calls in one tick. Excess tasks defer to the next tick.
"auto_decompose_per_tick": 3,
},
# execute_code settings — controls the tool used for programmatic tool calls.

View file

@ -610,6 +610,43 @@ def build_parser(parent_subparsers: argparse._SubParsersAction) -> argparse.Argu
help="Emit one JSON object per task on stdout",
)
# --- decompose --- (triage → fan-out via auxiliary LLM + orchestrator)
p_decompose = sub.add_parser(
"decompose",
help="Decompose a triage-column task into a graph of child tasks "
"routed to specialist profiles by description. Falls back to "
"specify-style single-task promotion when the task doesn't "
"benefit from fan-out. Uses auxiliary.kanban_decomposer.",
)
p_decompose.add_argument(
"task_id",
nargs="?",
default=None,
help="Task id to decompose (required unless --all is given)",
)
p_decompose.add_argument(
"--all",
dest="all_triage",
action="store_true",
help="Decompose every task currently in the triage column",
)
p_decompose.add_argument(
"--tenant",
default=None,
help="When used with --all, restrict the sweep to this tenant",
)
p_decompose.add_argument(
"--author",
default=None,
help="Author name recorded on the audit comment "
"(default: $HERMES_PROFILE or 'decomposer')",
)
p_decompose.add_argument(
"--json",
action="store_true",
help="Emit one JSON object per task on stdout",
)
# --- gc ---
p_gc = sub.add_parser(
"gc", help="Garbage-collect archived-task workspaces, old events, and old logs",
@ -740,6 +777,7 @@ def kanban_command(args: argparse.Namespace) -> int:
"notify-unsubscribe": _cmd_notify_unsubscribe,
"context": _cmd_context,
"specify": _cmd_specify,
"decompose": _cmd_decompose,
"gc": _cmd_gc,
}
handler = handlers.get(action)
@ -2115,6 +2153,87 @@ def _cmd_specify(args: argparse.Namespace) -> int:
return 0 if (ok_count > 0 or not ids) else 1
def _cmd_decompose(args: argparse.Namespace) -> int:
"""Fan a triage task (or all of them) out into a graph of child
tasks via the auxiliary LLM, routed to specialist profiles by
description. Thin wrapper over ``kanban_decompose``."""
from hermes_cli import kanban_decompose as decomp
all_flag = bool(getattr(args, "all_triage", False))
tenant = getattr(args, "tenant", None)
author = getattr(args, "author", None) or _profile_author()
want_json = bool(getattr(args, "json", False))
if args.task_id and all_flag:
print(
"kanban: pass either a task id OR --all, not both",
file=sys.stderr,
)
return 2
if all_flag:
ids = decomp.list_triage_ids(tenant=tenant)
if not ids:
msg = (
"No triage tasks"
+ (f" for tenant {tenant!r}" if tenant else "")
+ "."
)
if want_json:
print(json.dumps({"decomposed": 0, "total": 0}))
else:
print(msg)
return 0
elif args.task_id:
ids = [args.task_id]
else:
print(
"kanban: decompose requires a task id or --all",
file=sys.stderr,
)
return 2
ok_count = 0
for tid in ids:
outcome = decomp.decompose_task(tid, author=author)
if outcome.ok:
ok_count += 1
if want_json:
print(json.dumps({
"task_id": outcome.task_id,
"ok": outcome.ok,
"reason": outcome.reason,
"fanout": outcome.fanout,
"child_ids": outcome.child_ids,
"new_title": outcome.new_title,
}))
elif outcome.ok:
if outcome.fanout and outcome.child_ids:
child_summary = ", ".join(outcome.child_ids)
print(
f"Decomposed {outcome.task_id}{len(outcome.child_ids)} "
f"children ({child_summary}); root promoted to todo"
)
else:
title_suffix = (
f" — retitled: {outcome.new_title!r}"
if outcome.new_title
else ""
)
print(
f"Specified {outcome.task_id} → todo "
f"(no fanout){title_suffix}"
)
else:
print(
f"kanban: decompose {outcome.task_id}: {outcome.reason}",
file=sys.stderr,
)
if not all_flag:
return 0 if ok_count == 1 else 1
return 0 if (ok_count > 0 or not ids) else 1
def _cmd_gc(args: argparse.Namespace) -> int:
"""Remove scratch workspaces of archived tasks, prune old events, and
delete old worker logs."""

View file

@ -2777,6 +2777,180 @@ def specify_triage_task(
return True
def decompose_triage_task(
conn: sqlite3.Connection,
task_id: str,
*,
root_assignee: Optional[str],
children: list[dict],
author: Optional[str] = None,
) -> Optional[list[str]]:
"""Fan a triage task out into child tasks and promote the root to ``todo``.
The root task stays alive and becomes the parent of every child
when all children reach ``done``, the root promotes to ``ready`` and
its assignee (typically the orchestrator profile) wakes back up to
judge completion or spawn more work.
``children`` is a list of dicts, each shaped like::
{
"title": "...",
"body": "...", # optional
"assignee": "profile-name", # optional, None -> default fallback
"parents": [0, 2], # indices into this same children list
}
Returns the list of created child task ids (in input order) on
success. Returns ``None`` when:
- The root task does not exist
- The root task is not in ``triage``
- A cycle would result (caller built a bad graph)
Validation of titles/assignees happens inside the same write_txn as
the inserts so a malformed entry aborts the whole decomposition
cleanly (no orphan children).
"""
if not children:
return None
if root_assignee is not None:
root_assignee = _canonical_assignee(root_assignee)
# Pre-validate the children list shape outside the txn. Cheap checks
# that don't need DB access. Bad input aborts before we touch the DB.
for idx, child in enumerate(children):
if not isinstance(child, dict):
raise ValueError(f"child[{idx}] is not a dict")
title = child.get("title")
if not isinstance(title, str) or not title.strip():
raise ValueError(f"child[{idx}].title is required")
parents_idx = child.get("parents") or []
if not isinstance(parents_idx, list):
raise ValueError(f"child[{idx}].parents must be a list")
for p in parents_idx:
if not isinstance(p, int) or p < 0 or p >= len(children):
raise ValueError(
f"child[{idx}].parents[{p}] is not a valid index into children"
)
if p == idx:
raise ValueError(f"child[{idx}] cannot list itself as a parent")
# We do the full decomposition in a SINGLE write_txn so it's
# atomic: either every child is created AND the root flips to
# ``todo``, or nothing changes. We deliberately do NOT call any
# kb helper that opens its own write_txn (create_task, link_tasks,
# add_comment) from inside this block — see architecture.md
# write_txn pitfalls. Instead we inline the INSERTs and
# _append_event calls.
now = int(time.time())
child_ids: list[str] = []
with write_txn(conn):
root_row = conn.execute(
"SELECT id, status, tenant FROM tasks WHERE id = ?", (task_id,)
).fetchone()
if root_row is None:
return None
if root_row["status"] != "triage":
return None
tenant = root_row["tenant"]
# Create children. Status is 'todo' regardless of parents — we
# link them under the root AFTER creation so the dispatcher
# sees a coherent state, and recompute_ready() at the end
# promotes parent-free children to 'ready'.
for idx, child in enumerate(children):
new_id = _new_task_id()
title = child["title"].strip()
body = child.get("body")
assignee = _canonical_assignee(child.get("assignee"))
conn.execute(
"INSERT INTO tasks "
"(id, title, body, assignee, status, workspace_kind, "
" tenant, created_at, created_by) "
"VALUES (?, ?, ?, ?, 'todo', 'scratch', ?, ?, ?)",
(
new_id,
title,
body if isinstance(body, str) else None,
assignee,
tenant,
now,
(author or "decomposer"),
),
)
_append_event(
conn, new_id, "created",
{"by": author or "decomposer", "from_decompose_of": task_id},
)
child_ids.append(new_id)
# Link children to their sibling parents (within the decomposed graph).
for idx, child in enumerate(children):
for p_idx in child.get("parents") or []:
parent_id = child_ids[p_idx]
child_id = child_ids[idx]
conn.execute(
"INSERT OR IGNORE INTO task_links (parent_id, child_id) "
"VALUES (?, ?)",
(parent_id, child_id),
)
_append_event(
conn, child_id, "linked",
{"parent": parent_id, "child": child_id},
)
# Link the ROOT task as a child of every leaf child — i.e. the
# root waits for the whole graph. Simpler than computing leaves:
# link root under every child. Cycle-free because the root is
# only ever a child here, never a parent of children.
for cid in child_ids:
conn.execute(
"INSERT OR IGNORE INTO task_links (parent_id, child_id) "
"VALUES (?, ?)",
(cid, task_id),
)
# Flip the root: triage -> todo, set assignee to the orchestrator.
sets = ["status = 'todo'"]
params: list[Any] = []
if root_assignee is not None:
sets.append("assignee = ?")
params.append(root_assignee)
params.append(task_id)
conn.execute(
f"UPDATE tasks SET {', '.join(sets)} WHERE id = ?",
tuple(params),
)
# Audit comment + event on the root so the timeline shows the fan-out.
if author and author.strip():
conn.execute(
"INSERT INTO task_comments (task_id, author, body, created_at) "
"VALUES (?, ?, ?, ?)",
(
task_id,
author.strip(),
"Decomposed into "
+ ", ".join(child_ids)
+ ". Root will wake when all children complete.",
now,
),
)
_append_event(
conn, task_id, "decomposed",
{
"child_ids": child_ids,
"root_assignee": root_assignee,
},
)
# Outside the write_txn: promote parent-free children to 'ready'
# so the dispatcher picks them up on its next tick. Same pattern
# specify_triage_task uses.
recompute_ready(conn)
return child_ids
def archive_task(conn: sqlite3.Connection, task_id: str) -> bool:
with write_txn(conn):
cur = conn.execute(

View file

@ -0,0 +1,440 @@
"""Kanban decomposer — fan a triage task out into a graph of child tasks.
Invoked by ``hermes kanban decompose [task_id | --all]`` and the
auto-decompose path in the gateway dispatcher loop. Reads the user's
profile roster (with descriptions) and asks the auxiliary LLM to
return a task graph in JSON. Then atomically creates the children,
links them under the root, and flips the root ``triage -> todo``.
The root task stays alive and becomes the parent of every leaf child,
so when the whole graph completes the root wakes back up its
assignee (the orchestrator profile) gets a chance to judge completion
and add more tasks if the work isn't done yet.
Design notes
------------
* Mirrors the shape of ``hermes_cli/kanban_specify.py``: lazy aux
client import inside the function, lenient response parse, never
raises on expected failure modes.
* The system prompt sees the *configured* profile roster names plus
descriptions plus the default fallback. Profiles without a
description are still listed (with a note) so the orchestrator can
match on name as a fallback, but the user has an obvious incentive
to describe them.
* ``fanout=false`` collapses to the same effect as ``kanban specify``:
we tighten the body and flip ``triage -> todo`` as a single task,
no children created. This makes ``decompose`` a strict superset of
``specify`` from the user's perspective.
* If the LLM picks an assignee that doesn't exist as a profile, we
rewrite it to the configured ``default_assignee`` (or the default
profile if unset). A child task NEVER ends up with ``assignee=None``.
"""
from __future__ import annotations
import json
import logging
import os
import re
from dataclasses import dataclass
from typing import Optional
from hermes_cli import kanban_db as kb
from hermes_cli import profiles as profiles_mod
logger = logging.getLogger(__name__)
_SYSTEM_PROMPT = """You are the Kanban decomposer for the Hermes Agent board.
A user dropped a rough idea into the Triage column. Your job is to break it
into a small graph of concrete child tasks and route each one to the best-
matching profile from the available roster.
You will be given:
- The original task title and body
- The list of available profiles (each with name + description)
- The fallback "default_assignee" used when no profile fits
Output a single JSON object with this exact shape:
{
"fanout": true,
"rationale": "<one sentence on why this decomposition>",
"tasks": [
{
"title": "<concrete task title, imperative voice, <= 80 chars>",
"body": "<detailed spec for the worker on this child task>",
"assignee": "<profile name from the roster, or null for default>",
"parents": [<int>, ...]
},
...
]
}
Rules:
- "parents" is a list of INDICES (0-based) into this same "tasks" list,
expressing actual data dependencies. Tasks with no parents run in
PARALLEL. Tasks with parents wait until every parent completes.
- Prefer parallelism. If two tasks can be done independently, give
them no parents so the dispatcher fans them out at once.
- Use 2-6 tasks for normal work. Don't create 20 tiny tasks. Don't
cram everything into 1 task.
- Pick assignees from the roster by matching the task to the profile's
DESCRIPTION (not just the name). When nothing matches well, use null
and the system will route to the default_assignee.
- Each child task body is what a fresh worker will read with no other
context be specific about goal, approach, and acceptance criteria.
When the task is genuinely a single unit of work (no useful decomposition),
return:
{
"fanout": false,
"rationale": "<one sentence>",
"title": "<tightened title>",
"body": "<concrete spec for a single worker>"
}
In that case the task stays as one work item, just with a tightened spec.
No preamble, no closing remarks, no code fences. Output only the JSON object.
"""
_USER_TEMPLATE = """Task id: {task_id}
Title: {title}
Body:
{body}
Available profiles (assignees you may pick from):
{roster}
Default assignee (used when no profile fits a task): {default_assignee}
"""
_FENCE_RE = re.compile(r"^```(?:json)?\s*|\s*```$", re.MULTILINE)
@dataclass
class DecomposeOutcome:
"""Result of decomposing a single triage task."""
task_id: str
ok: bool
reason: str = ""
fanout: bool = False
child_ids: list[str] | None = None
new_title: Optional[str] = None
def _truncate(text: str, limit: int) -> str:
if len(text) <= limit:
return text
return text[: limit - 1] + ""
def _extract_json_blob(raw: str) -> Optional[dict]:
if not raw:
return None
stripped = _FENCE_RE.sub("", raw.strip())
first = stripped.find("{")
last = stripped.rfind("}")
if first == -1 or last == -1 or last <= first:
return None
candidate = stripped[first : last + 1]
try:
val = json.loads(candidate)
except (ValueError, json.JSONDecodeError):
return None
if not isinstance(val, dict):
return None
return val
def _profile_author() -> str:
"""Mirror of ``hermes_cli.kanban._profile_author``."""
return (
os.environ.get("HERMES_PROFILE")
or os.environ.get("USER")
or "decomposer"
)
def _load_config() -> dict:
try:
from hermes_cli.config import load_config
return load_config() or {}
except Exception:
return {}
def _resolve_orchestrator_profile(cfg: dict) -> str:
"""Resolve which profile owns decomposition.
Falls back to the active default profile when ``kanban.orchestrator_profile``
is unset, so a task is never stranded for lack of an orchestrator.
"""
kanban_cfg = cfg.get("kanban", {}) if isinstance(cfg, dict) else {}
explicit = (kanban_cfg.get("orchestrator_profile") or "").strip()
if explicit:
try:
if profiles_mod.profile_exists(explicit):
return explicit
except Exception:
pass
# Fall back to the active default profile.
try:
return profiles_mod.get_active_profile_name() or "default"
except Exception:
return "default"
def _resolve_default_assignee(cfg: dict) -> str:
"""Resolve which profile catches child tasks the orchestrator can't route."""
kanban_cfg = cfg.get("kanban", {}) if isinstance(cfg, dict) else {}
explicit = (kanban_cfg.get("default_assignee") or "").strip()
if explicit:
try:
if profiles_mod.profile_exists(explicit):
return explicit
except Exception:
pass
try:
return profiles_mod.get_active_profile_name() or "default"
except Exception:
return "default"
def _build_roster() -> tuple[list[dict], set[str]]:
"""Return (roster_for_prompt, valid_assignee_names).
Each roster entry is ``{name, description, has_description}``. The
valid-set is used after the LLM responds to rewrite invalid
assignees to the default fallback.
"""
roster: list[dict] = []
valid: set[str] = set()
try:
all_profiles = profiles_mod.list_profiles()
except Exception as exc:
logger.warning("decompose: failed to list profiles: %s", exc)
return roster, valid
for p in all_profiles:
desc = (p.description or "").strip()
roster.append({
"name": p.name,
"description": desc or f"(no description; profile named {p.name!r})",
"has_description": bool(desc),
})
valid.add(p.name)
return roster, valid
def _format_roster(roster: list[dict]) -> str:
if not roster:
return " (no profiles installed — decomposer cannot route work)"
lines = []
for entry in roster:
tag = "" if entry["has_description"] else " ⚠ undescribed"
lines.append(f" - {entry['name']}{tag}: {entry['description']}")
return "\n".join(lines)
def decompose_task(
task_id: str,
*,
author: Optional[str] = None,
timeout: Optional[int] = None,
) -> DecomposeOutcome:
"""Decompose a triage task into a graph of child tasks.
Returns an outcome describing what happened. Never raises for
expected failure modes (task not in triage, no aux client
configured, API error, malformed response, decomposer returned
fanout=true with empty task list) those surface via ``ok=False``.
"""
with kb.connect() as conn:
task = kb.get_task(conn, task_id)
if task is None:
return DecomposeOutcome(task_id, False, "unknown task id")
if task.status != "triage":
return DecomposeOutcome(
task_id, False, f"task is not in triage (status={task.status!r})"
)
cfg = _load_config()
orchestrator = _resolve_orchestrator_profile(cfg)
default_assignee = _resolve_default_assignee(cfg)
roster, valid_names = _build_roster()
try:
from agent.auxiliary_client import ( # type: ignore
get_auxiliary_extra_body,
get_text_auxiliary_client,
)
except Exception as exc:
logger.debug("decompose: auxiliary client import failed: %s", exc)
return DecomposeOutcome(task_id, False, "auxiliary client unavailable")
try:
client, model = get_text_auxiliary_client("kanban_decomposer")
except Exception as exc:
logger.debug("decompose: get_text_auxiliary_client failed: %s", exc)
return DecomposeOutcome(task_id, False, "auxiliary client unavailable")
if client is None or not model:
return DecomposeOutcome(task_id, False, "no auxiliary client configured")
user_msg = _USER_TEMPLATE.format(
task_id=task.id,
title=_truncate(task.title or "", 400),
body=_truncate(task.body or "(no body)", 4000),
roster=_format_roster(roster),
default_assignee=default_assignee,
)
try:
resp = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": _SYSTEM_PROMPT},
{"role": "user", "content": user_msg},
],
temperature=0.3,
max_tokens=4000,
timeout=timeout or 180,
extra_body=get_auxiliary_extra_body() or None,
)
except Exception as exc:
logger.info(
"decompose: API call failed for %s (%s)", task_id, exc,
)
return DecomposeOutcome(task_id, False, f"LLM error: {type(exc).__name__}")
try:
raw = resp.choices[0].message.content or ""
except Exception:
raw = ""
parsed = _extract_json_blob(raw)
if parsed is None:
return DecomposeOutcome(task_id, False, "LLM returned malformed JSON")
fanout = bool(parsed.get("fanout"))
audit_author = author or _profile_author()
if not fanout:
# Fall back to single-task spec promotion (same effect as specify).
new_title = parsed.get("title")
new_body = parsed.get("body")
title_val = new_title.strip() if isinstance(new_title, str) and new_title.strip() else None
body_val = new_body if isinstance(new_body, str) and new_body.strip() else None
if title_val is None and body_val is None:
return DecomposeOutcome(
task_id, False, "decomposer returned fanout=false with no title/body",
)
with kb.connect() as conn:
ok = kb.specify_triage_task(
conn,
task_id,
title=title_val,
body=body_val,
author=audit_author,
)
if not ok:
return DecomposeOutcome(
task_id, False, "task moved out of triage before promotion",
)
return DecomposeOutcome(
task_id, True, "single task (no fanout)",
fanout=False, new_title=title_val,
)
raw_tasks = parsed.get("tasks") or []
if not isinstance(raw_tasks, list) or not raw_tasks:
return DecomposeOutcome(
task_id, False, "decomposer returned fanout=true with empty tasks list",
)
# Rewrite invalid assignees to the default fallback. Never leave a
# task with assignee=None — the user explicitly does not want that.
children: list[dict] = []
for idx, entry in enumerate(raw_tasks):
if not isinstance(entry, dict):
return DecomposeOutcome(
task_id, False, f"tasks[{idx}] is not an object",
)
title = entry.get("title")
if not isinstance(title, str) or not title.strip():
return DecomposeOutcome(
task_id, False, f"tasks[{idx}].title is missing or empty",
)
body = entry.get("body")
if not isinstance(body, str):
body = ""
assignee = entry.get("assignee")
if not isinstance(assignee, str) or not assignee.strip():
chosen = default_assignee
elif assignee not in valid_names:
logger.info(
"decompose: task %s child %d picked unknown assignee %r"
"routing to default_assignee %r",
task_id, idx, assignee, default_assignee,
)
chosen = default_assignee
else:
chosen = assignee
parents = entry.get("parents") or []
if not isinstance(parents, list):
parents = []
# Clean parent indices: drop non-int and out-of-range.
clean_parents = [p for p in parents if isinstance(p, int) and 0 <= p < len(raw_tasks) and p != idx]
children.append({
"title": title.strip()[:200],
"body": body.strip(),
"assignee": chosen,
"parents": clean_parents,
})
try:
with kb.connect() as conn:
child_ids = kb.decompose_triage_task(
conn,
task_id,
root_assignee=orchestrator,
children=children,
author=audit_author,
)
except ValueError as exc:
return DecomposeOutcome(task_id, False, f"DB rejected graph: {exc}")
except Exception as exc:
logger.exception("decompose: DB error on task %s", task_id)
return DecomposeOutcome(task_id, False, f"DB error: {type(exc).__name__}")
if child_ids is None:
return DecomposeOutcome(
task_id, False, "task moved out of triage before decomposition",
)
return DecomposeOutcome(
task_id, True, f"decomposed into {len(child_ids)} children",
fanout=True, child_ids=child_ids,
)
def list_triage_ids(*, tenant: Optional[str] = None) -> list[str]:
"""Return task ids currently in the triage column."""
with kb.connect() as conn:
rows = kb.list_tasks(
conn,
status="triage",
tenant=tenant,
limit=1000,
)
return [row.id for row in rows]

View file

@ -9043,6 +9043,7 @@ def cmd_profile(args):
clone_config=clone,
no_alias=no_alias,
no_skills=no_skills,
description=getattr(args, "description", None),
)
print(f"\nProfile '{name}' created at {profile_dir}")
@ -9142,6 +9143,107 @@ def cmd_profile(args):
print(f"Error: {e}")
sys.exit(1)
elif action == "describe":
# Read or write a profile's description. The description is
# consumed by the kanban decomposer to route tasks based on
# role instead of name alone.
from hermes_cli import profiles as _profiles_mod
all_flag = bool(getattr(args, "all_missing", False))
auto_flag = bool(getattr(args, "auto", False))
overwrite_flag = bool(getattr(args, "overwrite", False))
text_value = getattr(args, "text", None)
name = getattr(args, "profile_name", None)
if all_flag and not auto_flag:
print("profile describe: --all requires --auto", file=sys.stderr)
sys.exit(2)
if all_flag and (text_value or name):
print(
"profile describe: --all is mutually exclusive with a profile name / --text",
file=sys.stderr,
)
sys.exit(2)
if not all_flag and not name:
print("profile describe: profile name is required (or --all --auto)", file=sys.stderr)
sys.exit(2)
if text_value and auto_flag:
print(
"profile describe: --text is mutually exclusive with --auto",
file=sys.stderr,
)
sys.exit(2)
# Show current description if no operation requested.
if name and not text_value and not auto_flag:
try:
if _profiles_mod.normalize_profile_name(name) == "default":
from hermes_constants import get_hermes_home as _hh
profile_dir = Path(_hh())
else:
profile_dir = _profiles_mod.get_profile_dir(name)
except Exception as exc:
print(f"Error: {exc}", file=sys.stderr)
sys.exit(1)
if not profile_dir.is_dir():
print(f"Error: profile '{name}' not found", file=sys.stderr)
sys.exit(1)
meta = _profiles_mod.read_profile_meta(profile_dir)
desc = meta.get("description") or ""
if not desc:
print(f"(no description set for '{name}')")
else:
tag = "[auto] " if meta.get("description_auto") else ""
print(f"{tag}{desc}")
sys.exit(0)
# --text path: just write the user-authored description.
if text_value:
try:
if _profiles_mod.normalize_profile_name(name) == "default":
from hermes_constants import get_hermes_home as _hh
profile_dir = Path(_hh())
else:
profile_dir = _profiles_mod.get_profile_dir(name)
_profiles_mod.write_profile_meta(
profile_dir,
description=text_value,
description_auto=False,
)
print(f"Description updated for '{name}'.")
except Exception as exc:
print(f"Error: {exc}", file=sys.stderr)
sys.exit(1)
sys.exit(0)
# --auto path: invoke the LLM describer.
from hermes_cli import profile_describer as _pd
if all_flag:
targets = _pd.list_describable_profiles(missing_only=True)
if not targets:
print("All profiles already have descriptions.")
sys.exit(0)
else:
targets = [name]
ok_count = 0
fail_count = 0
for tgt in targets:
outcome = _pd.describe_profile(tgt, overwrite=overwrite_flag)
if outcome.ok:
ok_count += 1
print(f"Described '{outcome.profile_name}': {outcome.description}")
else:
fail_count += 1
print(
f"profile describe {outcome.profile_name}: {outcome.reason}",
file=sys.stderr,
)
if not all_flag:
sys.exit(0 if ok_count == 1 else 1)
sys.exit(0 if ok_count > 0 else 1)
elif action == "show":
name = args.profile_name
from hermes_cli.profiles import (
@ -12023,6 +12125,13 @@ Examples:
action="store_true",
help="Create an empty profile with no bundled skills (opts out of `hermes update` skill sync)",
)
profile_create.add_argument(
"--description",
default=None,
help="One- or two-sentence description of what this profile is good at. "
"Used by the kanban decomposer to route tasks based on role instead "
"of profile name alone. Skip and add later via `hermes profile describe`.",
)
profile_delete = profile_subparsers.add_parser("delete", help="Delete a profile")
profile_delete.add_argument("profile_name", help="Profile to delete")
@ -12030,6 +12139,40 @@ Examples:
"-y", "--yes", action="store_true", help="Skip confirmation prompt"
)
profile_describe = profile_subparsers.add_parser(
"describe",
help="Read or set a profile's description (used by the kanban orchestrator)",
)
profile_describe.add_argument(
"profile_name",
nargs="?",
default=None,
help="Profile to describe (omit + use --all --auto to sweep)",
)
profile_describe.add_argument(
"--text",
default=None,
help="Set description to this exact text (overwrites any existing description)",
)
profile_describe.add_argument(
"--auto",
action="store_true",
help="Auto-generate description via the auxiliary LLM "
"(uses auxiliary.profile_describer)",
)
profile_describe.add_argument(
"--overwrite",
action="store_true",
help="With --auto, replace user-authored descriptions too (default: only "
"fill in missing or previously-auto descriptions)",
)
profile_describe.add_argument(
"--all",
dest="all_missing",
action="store_true",
help="With --auto, run on every profile missing a description",
)
profile_show = profile_subparsers.add_parser("show", help="Show profile details")
profile_show.add_argument("profile_name", help="Profile to show")

View file

@ -0,0 +1,299 @@
"""Profile describer — auto-generate ``description`` for a profile.
Used by ``hermes profile describe <name> --auto`` and the dashboard's
"auto-generate description" button. Reads the profile's installed
skills, model+provider, name, and optionally a small slice of memory,
then asks the auxiliary LLM to produce a 1-2 sentence description of
what the profile is good at.
Result is written to ``<profile_dir>/profile.yaml`` with
``description_auto: true`` so the dashboard can surface a "review"
badge. User can edit afterward to confirm.
Design notes
------------
- Mirrors the shape of ``hermes_cli/kanban_specify.py``: lazy aux
client import inside the function, lenient response parse, never
raises on expected failure modes.
- Reads at most ``MAX_SKILLS_FOR_PROMPT`` skill names to keep the
prompt bounded. No skill body names + categories are enough
signal and avoid blowing context on profiles with 100+ skills.
- Memory is intentionally NOT read here. Memories are personal and
the orchestrator routes work to a *role* not a *biography*. If we
find later that memory adds signal we can wire it; for now,
skills + name + model is plenty.
"""
from __future__ import annotations
import json
import logging
import os
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from hermes_cli import profiles as profiles_mod
logger = logging.getLogger(__name__)
# Cap on how many skill names we feed the LLM. Profiles with 200+
# skills (uncommon but possible) would blow context otherwise. The cap
# is per-category — see _collect_skills.
MAX_SKILLS_FOR_PROMPT = 60
_SYSTEM_PROMPT = """You are a profile-describer for the Hermes Agent kanban board.
A user runs multiple "profiles" distinct agent identities, each with their
own skills, model, and configuration. The kanban board's orchestrator routes
work to whichever profile best fits each task. To do that well, every
profile needs a short, concrete description of what it's good at.
You are given a profile's:
- Name
- Model / provider
- List of installed skill names (a strong signal of role / domain)
Produce a single JSON object with exactly one key:
{
"description": "<1-2 sentence description, plain prose, no preamble>"
}
Rules:
- The description is what an orchestrator will read to decide whether to
route a task here. Lead with the profile's strongest capability.
- Stay concrete. Bad: "an AI agent that helps users."
Good: "Reads and modifies Python codebases — runs tests,
refactors functions, opens GitHub PRs."
- 1-2 sentences, <= 280 characters total.
- Never invent capabilities the skills don't suggest.
- Never write "Hermes Agent profile" or other meta-narration.
- No code fences, no preamble, no closing remarks. Output only JSON.
"""
_USER_TEMPLATE = """Profile name: {name}
Default model: {model}
Provider: {provider}
Installed skill count: {skill_count}
Notable skills (up to {skill_cap}):
{skill_list}
"""
_FENCE_RE = re.compile(r"^```(?:json)?\s*|\s*```$", re.MULTILINE)
@dataclass
class DescribeOutcome:
"""Result of describing a single profile."""
profile_name: str
ok: bool
reason: str = ""
description: Optional[str] = None
def _collect_skills(profile_dir: Path) -> list[str]:
"""Return a stable, capped list of skill names for the prompt.
Format: ``category/skill_name`` where category is the immediate
subdir under ``skills/`` (e.g. ``devops``, ``research``). Skills
that live directly under ``skills/`` show as bare ``skill_name``.
"""
skills_dir = profile_dir / "skills"
if not skills_dir.is_dir():
return []
names: list[str] = []
for md in skills_dir.rglob("SKILL.md"):
path_str = str(md)
if "/.hub/" in path_str or "/.git/" in path_str:
continue
try:
rel = md.relative_to(skills_dir)
except ValueError:
continue
parts = rel.parts[:-1] # drop SKILL.md filename
if not parts:
continue
# parts[-1] is the skill dir name; parts[:-1] is the category path
if len(parts) == 1:
names.append(parts[0])
else:
names.append(f"{parts[0]}/{parts[-1]}")
names.sort()
# Keep within prompt budget. Skills earlier in alphabet aren't more
# important — we'll let the LLM see a sample. Pick evenly-spaced
# entries instead of just the head so a profile with skills A..Z
# doesn't get described as "starts with A".
if len(names) <= MAX_SKILLS_FOR_PROMPT:
return names
step = len(names) / MAX_SKILLS_FOR_PROMPT
sampled = [names[int(i * step)] for i in range(MAX_SKILLS_FOR_PROMPT)]
return sampled
def _extract_json_blob(raw: str) -> Optional[dict]:
if not raw:
return None
stripped = _FENCE_RE.sub("", raw.strip())
first = stripped.find("{")
last = stripped.rfind("}")
if first == -1 or last == -1 or last <= first:
return None
candidate = stripped[first : last + 1]
try:
val = json.loads(candidate)
except (ValueError, json.JSONDecodeError):
return None
if not isinstance(val, dict):
return None
return val
def describe_profile(
profile_name: str,
*,
overwrite: bool = False,
timeout: Optional[int] = None,
) -> DescribeOutcome:
"""Auto-generate a description for one profile.
Returns an outcome describing what happened. Never raises for
expected failure modes (profile missing, no aux client configured,
API error, malformed response) those surface via ``ok=False`` so
a sweep can continue past individual failures.
``overwrite`` controls whether an existing user-authored description
is replaced. By default we refuse to overwrite a description with
``description_auto: false`` to protect curated text. Auto-generated
descriptions (``description_auto: true``) are always replaceable.
"""
canon = profiles_mod.normalize_profile_name(profile_name)
if not profiles_mod.profile_exists(canon):
# Special case: "default" exists as a virtual profile name
# mapped to the default home dir. profile_exists() handles it.
return DescribeOutcome(canon, False, "profile not found")
try:
if canon == "default":
from hermes_constants import get_hermes_home # type: ignore
profile_dir = Path(get_hermes_home())
else:
profile_dir = profiles_mod.get_profile_dir(canon)
except Exception as exc:
return DescribeOutcome(canon, False, f"cannot resolve profile dir: {exc}")
# Honor curated descriptions unless --overwrite.
existing = profiles_mod.read_profile_meta(profile_dir)
if existing.get("description") and not existing.get("description_auto") and not overwrite:
return DescribeOutcome(
canon,
False,
"profile already has a user-authored description "
"(use --overwrite to replace)",
)
skill_names = _collect_skills(profile_dir)
skill_list = "\n".join(f" - {n}" for n in skill_names) or " (no skills installed)"
skill_count = sum(
1 for _ in (profile_dir / "skills").rglob("SKILL.md")
if "/.hub/" not in str(_) and "/.git/" not in str(_)
) if (profile_dir / "skills").is_dir() else 0
# Read model + provider from the profile's config.
try:
model, provider = profiles_mod._read_config_model(profile_dir)
except Exception:
model, provider = None, None
try:
from agent.auxiliary_client import ( # type: ignore
get_auxiliary_extra_body,
get_text_auxiliary_client,
)
except Exception as exc:
logger.debug("describe: auxiliary client import failed: %s", exc)
return DescribeOutcome(canon, False, "auxiliary client unavailable")
try:
client, aux_model = get_text_auxiliary_client("profile_describer")
except Exception as exc:
logger.debug("describe: get_text_auxiliary_client failed: %s", exc)
return DescribeOutcome(canon, False, "auxiliary client unavailable")
if client is None or not aux_model:
return DescribeOutcome(canon, False, "no auxiliary client configured")
user_msg = _USER_TEMPLATE.format(
name=canon,
model=(model or "(unset)"),
provider=(provider or "(unset)"),
skill_count=skill_count,
skill_cap=MAX_SKILLS_FOR_PROMPT,
skill_list=skill_list,
)
try:
resp = client.chat.completions.create(
model=aux_model,
messages=[
{"role": "system", "content": _SYSTEM_PROMPT},
{"role": "user", "content": user_msg},
],
temperature=0.3,
max_tokens=400,
timeout=timeout or 60,
extra_body=get_auxiliary_extra_body() or None,
)
except Exception as exc:
logger.info("describe: API call failed for %s (%s)", canon, exc)
return DescribeOutcome(canon, False, f"LLM error: {type(exc).__name__}")
try:
raw = resp.choices[0].message.content or ""
except Exception:
raw = ""
parsed = _extract_json_blob(raw)
if parsed is None:
# Fall back: take the raw text trimmed to one paragraph.
text = raw.strip().split("\n\n", 1)[0]
if not text:
return DescribeOutcome(canon, False, "LLM returned an empty response")
description = text[:280]
else:
val = parsed.get("description")
if not isinstance(val, str) or not val.strip():
return DescribeOutcome(
canon, False, "LLM response missing 'description' field"
)
description = val.strip()[:280]
try:
profiles_mod.write_profile_meta(
profile_dir,
description=description,
description_auto=True,
)
except Exception as exc:
return DescribeOutcome(canon, False, f"failed to write profile.yaml: {exc}")
return DescribeOutcome(canon, True, "described", description=description)
def list_describable_profiles(*, missing_only: bool = True) -> list[str]:
"""Return profile names that can be described.
``missing_only=True`` (default) returns only profiles without a
description. ``missing_only=False`` returns every profile.
"""
out: list[str] = []
for p in profiles_mod.list_profiles():
if missing_only and (p.description or "").strip() and not p.description_auto:
continue
out.append(p.name)
return out

View file

@ -412,6 +412,17 @@ class ProfileInfo:
distribution_name: Optional[str] = None
distribution_version: Optional[str] = None
distribution_source: Optional[str] = None
# Free-form description (1-2 sentences) of what this profile is good
# at. Persisted in ``<profile_dir>/profile.yaml``. Empty when the
# user has not described the profile (legacy profiles, fresh
# installs). Surfaced to the kanban decomposer so it can route work
# to the right profile based on role rather than name alone.
description: str = ""
# When True, ``description`` was auto-generated by the LLM
# describer and has not been confirmed by the user. The dashboard
# surfaces a "review" badge in this case so the user can edit or
# accept.
description_auto: bool = False
def _read_distribution_meta(profile_dir: Path) -> tuple:
@ -479,6 +490,82 @@ def _count_skills(profile_dir: Path) -> int:
return count
# ---------------------------------------------------------------------------
# profile.yaml — per-profile metadata (description, role, etc.)
# ---------------------------------------------------------------------------
#
# We keep this file deliberately tiny and separate from the profile's
# ``config.yaml``. ``config.yaml`` is the user-facing Hermes config
# (~5000 lines of defaults); ``profile.yaml`` is metadata ABOUT the
# profile itself (its role, who described it). Mixing them makes both
# harder to read.
#
# Missing file -> empty defaults; never an error. The kanban decomposer
# tolerates empty descriptions and just falls back to the profile name.
def _profile_yaml_path(profile_dir: Path) -> Path:
return profile_dir / "profile.yaml"
def read_profile_meta(profile_dir: Path) -> dict:
"""Read ``<profile_dir>/profile.yaml`` and return a dict.
Returns ``{"description": "", "description_auto": False}`` when the
file is missing or unreadable. Never raises a corrupt
profile.yaml on an unrelated profile must not break
``hermes profile list``.
"""
path = _profile_yaml_path(profile_dir)
if not path.is_file():
return {"description": "", "description_auto": False}
try:
import yaml
with open(path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
except Exception:
return {"description": "", "description_auto": False}
if not isinstance(data, dict):
return {"description": "", "description_auto": False}
return {
"description": str(data.get("description") or "").strip(),
"description_auto": bool(data.get("description_auto", False)),
}
def write_profile_meta(
profile_dir: Path,
*,
description: Optional[str] = None,
description_auto: Optional[bool] = None,
) -> None:
"""Update ``<profile_dir>/profile.yaml`` in place.
Only the explicitly passed fields are overwritten; unspecified
fields preserve existing values. Creates the file if missing.
Profile directory itself must exist.
"""
if not profile_dir.is_dir():
raise FileNotFoundError(f"profile directory does not exist: {profile_dir}")
import yaml
path = _profile_yaml_path(profile_dir)
existing: dict = {}
if path.is_file():
try:
with open(path, "r", encoding="utf-8") as f:
loaded = yaml.safe_load(f) or {}
if isinstance(loaded, dict):
existing = loaded
except Exception:
existing = {}
if description is not None:
existing["description"] = description.strip()
if description_auto is not None:
existing["description_auto"] = bool(description_auto)
with open(path, "w", encoding="utf-8") as f:
yaml.safe_dump(existing, f, sort_keys=False, default_flow_style=False)
# ---------------------------------------------------------------------------
# CRUD operations
# ---------------------------------------------------------------------------
@ -493,6 +580,7 @@ def list_profiles() -> List[ProfileInfo]:
if default_home.is_dir():
model, provider = _read_config_model(default_home)
dist_name, dist_version, dist_source = _read_distribution_meta(default_home)
meta = read_profile_meta(default_home)
profiles.append(ProfileInfo(
name="default",
path=default_home,
@ -505,6 +593,8 @@ def list_profiles() -> List[ProfileInfo]:
distribution_name=dist_name,
distribution_version=dist_version,
distribution_source=dist_source,
description=meta.get("description", ""),
description_auto=meta.get("description_auto", False),
))
# Named profiles
@ -519,6 +609,7 @@ def list_profiles() -> List[ProfileInfo]:
model, provider = _read_config_model(entry)
alias_path = wrapper_dir / name
dist_name, dist_version, dist_source = _read_distribution_meta(entry)
meta = read_profile_meta(entry)
profiles.append(ProfileInfo(
name=name,
path=entry,
@ -532,6 +623,8 @@ def list_profiles() -> List[ProfileInfo]:
distribution_name=dist_name,
distribution_version=dist_version,
distribution_source=dist_source,
description=meta.get("description", ""),
description_auto=meta.get("description_auto", False),
))
return profiles
@ -544,6 +637,7 @@ def create_profile(
clone_config: bool = False,
no_alias: bool = False,
no_skills: bool = False,
description: Optional[str] = None,
) -> Path:
"""Create a new profile directory.
@ -667,6 +761,19 @@ def create_profile(
except OSError:
pass # best-effort — the feature still works via the empty skills/ dir
# Persist description if the caller provided one. Done last so a
# partial-create failure doesn't strand a description file in an
# incomplete profile.
if description and description.strip():
try:
write_profile_meta(
profile_dir,
description=description.strip(),
description_auto=False,
)
except Exception:
pass # non-fatal — user can describe later with `hermes profile describe`
return profile_dir