feat(kanban): auto-subscribe calling session on kanban_create

When a worker calls kanban_create from inside a session that has a
persistent delivery channel, the originating session is now subscribed
to the new task's completion/block events automatically. The agent
that dispatched the task gets notified instead of having to poll.

- Gateway sessions (telegram/discord/slack): HERMES_SESSION_PLATFORM +
  HERMES_SESSION_CHAT_ID ContextVars, set by the messaging gateway.
- TUI / desktop sessions: HERMES_SESSION_KEY in the subprocess env.
  The TUI notification poller keys on platform='tui' + chat_id=<key>.
- CLI / cron / test: no persistent channel, no subscription.

Gated by kanban.auto_subscribe_on_create in config.yaml (default True).
Disable to mirror pre-feature behaviour — users who want explicit
kanban_notify-subscribe calls per task can set it to false. This
config gate addresses the design concern that got PR #19718 reverted
upstream (unconditional implicit auto-subscribe on tool-driven
kanban_create was too aggressive for orchestrator users).

HERMES_SESSION_ID is intentionally not a fallback channel — it is
set by ACP/agent subprocess telemetry for every invocation, not just
TUI, so treating it as a notification target would auto-subscribe
every CLI session and re-introduce the over-eager behaviour.

The kanban_create response now includes a 'subscribed' bool so
orchestrators can react if subscription failed (e.g. by falling
back to explicit kanban_notify-subscribe or to polling).

Includes 6 tests covering the gateway / TUI / CLI / partial-context /
gated / add_notify_sub-failure paths. All 90 tests in
test_kanban_tools.py pass; 509 broader kanban tests pass.
This commit is contained in:
flooryyyy 2026-06-15 17:04:04 +01:00 committed by Teknium
parent 1ea2b27993
commit f8d8f045fa
5 changed files with 306 additions and 0 deletions

View file

@ -1271,6 +1271,21 @@ DEFAULT_CONFIG = {
# global threshold regardless.
},
# Kanban subsystem (orchestrator workers + dispatcher-driven child tasks).
# See tools/kanban_tools.py and hermes_cli/kanban_db.py for the actual
# implementations. Per-platform notification opt-out is handled by the
# kanban dashboard (see ``hermes dashboard`` -> Notifications).
"kanban": {
# Auto-subscribe the originating gateway/TUI session to task
# completion + block events when ``kanban_create`` is called from
# inside a session that has a persistent delivery channel. The
# agent that dispatched the task will get notified automatically
# instead of having to poll. Disable to mirror pre-feature
# behaviour — e.g. for a profile that prefers explicit
# ``kanban_notify-subscribe`` calls per task.
"auto_subscribe_on_create": True,
},
# Anthropic prompt caching (Claude via OpenRouter or native Anthropic API).
# cache_ttl must be "5m" or "1h" (Anthropic-supported tiers); other values are ignored.
"prompt_caching": {

View file

@ -1812,3 +1812,193 @@ def test_board_param_in_all_schemas():
assert "board" not in schema["parameters"].get("required", []), (
f"{schema['name']} marks board as required; must be optional"
)
# ---------------------------------------------------------------------------
# kanban_create auto-subscribe behaviour
#
# When a worker calls kanban_create from inside a session that has a
# persistent delivery channel, the originating session should be
# subscribed to the new task's completion/block events automatically.
# - Gateway sessions: HERMES_SESSION_PLATFORM + HERMES_SESSION_CHAT_ID set.
# - TUI sessions: HERMES_SESSION_KEY (or HERMES_SESSION_ID) set, with
# the platform/chat_id ContextVars intentionally empty.
# - CLI / cron / test sessions: no delivery channel -> no subscription.
# - Config gate kanban.auto_subscribe_on_create: false -> no subscription
# even when the session has a delivery channel.
# ---------------------------------------------------------------------------
def _list_subs_for_task(task_id):
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
return list(kb.list_notify_subs(conn, task_id))
finally:
conn.close()
def _sub_index(subs):
"""Normalise a list of notify-subs (dicts or objects) into dicts
keyed by platform+chat_id, so assertions work regardless of the
return shape."""
out = []
for s in subs:
if isinstance(s, dict):
out.append(s)
else:
out.append({
"platform": getattr(s, "platform", None),
"chat_id": getattr(s, "chat_id", None),
"thread_id": getattr(s, "thread_id", None),
"user_id": getattr(s, "user_id", None),
})
return out
def test_create_subscribes_gateway_session(monkeypatch, worker_env):
"""A gateway session (platform + chat_id set) gets auto-subscribed
to its own kanban_create result, and the response surfaces the
``subscribed`` flag so the orchestrator can react."""
from tools import kanban_tools as kt
monkeypatch.setenv("HERMES_SESSION_PLATFORM", "telegram")
monkeypatch.setenv("HERMES_SESSION_CHAT_ID", "chat-42")
monkeypatch.setenv("HERMES_SESSION_THREAD_ID", "thread-7")
monkeypatch.setenv("HERMES_SESSION_USER_ID", "user-9")
out = kt._handle_create({
"title": "auto-sub gateway",
"assignee": "peer",
})
d = json.loads(out)
assert d["ok"] is True
new_tid = d["task_id"]
assert d["subscribed"] is True, d
subs = _sub_index(_list_subs_for_task(new_tid))
assert len(subs) == 1
s = subs[0]
assert s["platform"] == "telegram"
assert s["chat_id"] == "chat-42"
assert s["thread_id"] == "thread-7"
assert s["user_id"] == "user-9"
def test_create_subscribes_tui_session_via_session_key(monkeypatch, worker_env):
"""TUI / desktop sessions don't have a platform/chat_id (single
local channel), but the parent process exports HERMES_SESSION_KEY.
We should still auto-subscribe, with platform='tui' and
chat_id=<key>."""
from tools import kanban_tools as kt
monkeypatch.delenv("HERMES_SESSION_PLATFORM", raising=False)
monkeypatch.delenv("HERMES_SESSION_CHAT_ID", raising=False)
monkeypatch.delenv("HERMES_SESSION_THREAD_ID", raising=False)
monkeypatch.delenv("HERMES_SESSION_USER_ID", raising=False)
monkeypatch.setenv("HERMES_SESSION_KEY", "tui-session-abc")
monkeypatch.delenv("HERMES_SESSION_ID", raising=False)
out = kt._handle_create({
"title": "auto-sub tui",
"assignee": "peer",
})
d = json.loads(out)
assert d["ok"] is True
new_tid = d["task_id"]
assert d["subscribed"] is True, d
subs = _sub_index(_list_subs_for_task(new_tid))
assert len(subs) == 1
assert subs[0]["platform"] == "tui"
assert subs[0]["chat_id"] == "tui-session-abc"
def test_create_does_not_subscribe_in_cli_session(monkeypatch, worker_env):
"""CLI / cron / test sessions have no persistent delivery channel.
_maybe_auto_subscribe returns False and no row is written."""
from tools import kanban_tools as kt
monkeypatch.delenv("HERMES_SESSION_PLATFORM", raising=False)
monkeypatch.delenv("HERMES_SESSION_CHAT_ID", raising=False)
monkeypatch.delenv("HERMES_SESSION_KEY", raising=False)
monkeypatch.delenv("HERMES_SESSION_ID", raising=False)
out = kt._handle_create({
"title": "no sub cli",
"assignee": "peer",
})
d = json.loads(out)
assert d["ok"] is True
assert d["subscribed"] is False, d
assert _list_subs_for_task(d["task_id"]) == []
def test_create_respects_auto_subscribe_on_create_false(monkeypatch, worker_env, tmp_path):
"""The config gate kanban.auto_subscribe_on_create=false must
suppress auto-subscription even when the session has a delivery
channel. This is the knob that addresses the upstream design
concern from PR #19718 (reverted in #19721) — users who want
explicit kanban_notify-subscribe calls per task get that."""
# worker_env already created <tmp>/.hermes; use a fresh sibling
# home to avoid mkdir() colliding with the worker's directory.
home = tmp_path / "gate-home" / ".hermes"
home.mkdir(parents=True)
(home / "config.yaml").write_text(
"kanban:\n auto_subscribe_on_create: false\n"
)
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setenv("HERMES_SESSION_PLATFORM", "discord")
monkeypatch.setenv("HERMES_SESSION_CHAT_ID", "channel-1")
from tools import kanban_tools as kt
out = kt._handle_create({
"title": "no sub gated",
"assignee": "peer",
})
d = json.loads(out)
assert d["ok"] is True
assert d["subscribed"] is False, d
assert _list_subs_for_task(d["task_id"]) == []
def test_create_partial_session_context_no_subscribe(monkeypatch, worker_env):
"""Only one of (platform, chat_id) set -> no implicit subscribe.
Either both are set (gateway) or neither (TUI / CLI); partial is
ambiguous and the safe default is to skip."""
from tools import kanban_tools as kt
monkeypatch.setenv("HERMES_SESSION_PLATFORM", "slack")
monkeypatch.delenv("HERMES_SESSION_CHAT_ID", raising=False)
monkeypatch.delenv("HERMES_SESSION_KEY", raising=False)
monkeypatch.delenv("HERMES_SESSION_ID", raising=False)
out = kt._handle_create({
"title": "no sub partial",
"assignee": "peer",
})
d = json.loads(out)
assert d["ok"] is True
assert d["subscribed"] is False, d
def test_maybe_auto_subscribe_swallows_add_notify_sub_failure(monkeypatch, worker_env):
"""If add_notify_sub itself raises (e.g. DB locked, schema drift),
_maybe_auto_subscribe must NOT bubble that up and fail the parent
kanban_create. The function returns False and the parent create
still succeeds with subscribed=False."""
from tools import kanban_tools as kt
monkeypatch.setenv("HERMES_SESSION_PLATFORM", "telegram")
monkeypatch.setenv("HERMES_SESSION_CHAT_ID", "chat-42")
from hermes_cli import kanban_db as kb
def _boom(*a, **kw):
raise RuntimeError("simulated DB failure")
monkeypatch.setattr(kb, "add_notify_sub", _boom)
out = kt._handle_create({
"title": "auto-sub tolerates add_notify_sub failure",
"assignee": "peer",
})
d = json.loads(out)
assert d["ok"] is True, d
assert d["subscribed"] is False, d

View file

@ -34,6 +34,7 @@ import os
from typing import Any, Optional
from tools.registry import registry, tool_error
from hermes_cli.config import cfg_get, load_config
logger = logging.getLogger(__name__)
@ -818,9 +819,11 @@ def _handle_create(args: dict, **kw) -> str:
session_id=session_id,
)
new_task = kb.get_task(conn, new_tid)
subscribed = _maybe_auto_subscribe(conn, new_tid)
return _ok(
task_id=new_tid,
status=new_task.status if new_task else None,
subscribed=subscribed,
)
finally:
conn.close()
@ -831,6 +834,102 @@ def _handle_create(args: dict, **kw) -> str:
return tool_error(f"kanban_create: {e}")
def _maybe_auto_subscribe(conn: Any, task_id: str) -> bool:
"""Auto-subscribe the calling session to task completion / block events.
Returns True if a subscription row was written, False otherwise (no
session context, config gate disabled, or best-effort failure). The
caller surfaces this in the ``subscribed`` field of the kanban_create
response so an orchestrator can decide whether to fall back to an
explicit ``kanban_notify-subscribe`` or to polling.
Gated by ``kanban.auto_subscribe_on_create`` in config.yaml (default
True). Disable to mirror pre-feature behaviour, e.g. when the
originating user/chat opted out via the per-platform notification
toggle (see ``hermes dashboard``).
Subscription paths:
- **Gateway** (telegram/discord/slack/etc): ``HERMES_SESSION_PLATFORM``
and ``HERMES_SESSION_CHAT_ID`` are set in ContextVars by the
messaging gateway before agent dispatch. The notification poller
already keys off these, so we just register a row.
- **TUI** (herm desktop / herm TUI): the platform/chat_id ContextVars
are intentionally cleared (TUI is a single-channel local UI, not
a multi-tenant chat surface), but the agent subprocess inherits
``HERMES_SESSION_KEY`` from the parent session. We subscribe with
``platform="tui"`` and ``chat_id=<key>``; the TUI notification
poller (``tui_gateway/server.py``) reads ``kanban_notify_subs``
for these rows and posts the completion message into the running
session.
- **CLI / cron / test / unattached**: no persistent delivery channel,
no-op.
Failure mode: any exception inside the function is logged at WARNING
with the offending exception + diagnostic env vars and swallowed.
We never want a notification bookkeeping failure to fail the
kanban_create that the agent is mid-conversation about.
"""
try:
cfg = load_config()
if not cfg_get(cfg, "kanban", "auto_subscribe_on_create", default=True):
return False
except Exception:
# If config can't load we still default to True — this is the
# user-friendly behaviour that mirrors the pre-gate implementation.
pass
platform = ""
chat_id = ""
try:
from gateway.session_context import get_session_env
platform = get_session_env("HERMES_SESSION_PLATFORM", "")
chat_id = get_session_env("HERMES_SESSION_CHAT_ID", "")
if not platform or not chat_id:
# TUI / desktop fallback: platform/chat_id ContextVars are
# cleared for TUI sessions, but the parent process exports
# HERMES_SESSION_KEY into the subprocess env. Treat that
# as a "tui" subscription so the TUI notification poller
# (tui_gateway/server.py) can pick it up.
#
# HERMES_SESSION_ID is intentionally NOT a fallback here:
# it is set by ACP / the agent subprocess for telemetry
# regardless of whether the parent is a TUI or a CLI, so
# treating it as a notification target would auto-subscribe
# every CLI invocation, which is exactly the over-eager
# behaviour that got #19718 reverted upstream. The TUI
# poller keys on HERMES_SESSION_KEY.
session_key = (
get_session_env("HERMES_SESSION_KEY", "")
or os.environ.get("HERMES_SESSION_KEY", "")
)
if not session_key:
return False # CLI / cron / test — no persistent channel
platform = "tui"
chat_id = session_key
thread_id = get_session_env("HERMES_SESSION_THREAD_ID", "") or None
user_id = get_session_env("HERMES_SESSION_USER_ID", "") or None
notifier_profile = os.environ.get("HERMES_PROFILE")
# Lazy-import to keep the module-level dependency light
from hermes_cli import kanban_db as _kb
_kb.add_notify_sub(
conn, task_id=task_id,
platform=platform, chat_id=chat_id,
thread_id=thread_id, user_id=user_id,
notifier_profile=notifier_profile,
)
return True
except Exception as _exc:
logger.warning(
"_maybe_auto_subscribe failed: %r (platform=%r key_set=%r)",
_exc, platform, bool(chat_id),
)
return False
def _handle_unblock(args: dict, **kw) -> str:
"""Transition a blocked task back to ready."""
guard = _require_orchestrator_tool("kanban_unblock")

View file

@ -535,6 +535,7 @@ Config knobs (all under `kanban:` in `~/.hermes/config.yaml`):
| `auto_decompose_per_tick` | `3` | Cap on decompositions per dispatcher tick. Excess defers to the next tick. |
| `orchestrator_profile` | `""` | Profile assigned to the root/orchestration task after decomposition. Empty = fall back to active default profile. |
| `default_assignee` | `""` | Where a child task lands when the LLM picks an unknown profile. Empty = fall back to active default. |
| `auto_subscribe_on_create` | `true` | When a worker calls `kanban_create` from inside a session with a persistent delivery channel (messaging gateway or TUI), the originating session is auto-subscribed to the new task's completion/block events. The dispatcher still drives the delivery — this only changes whether the caller's chat/key shows up in the notify-sub table. Set to `false` to require explicit `kanban_notify-subscribe` calls per task. |
And the two auxiliary LLM slots:

View file

@ -431,6 +431,7 @@ hermes dashboard # 导航栏中出现 "Kanban" 标签页,位于 "Skills
| `auto_decompose_per_tick` | `3` | 每个调度器 tick 的分解上限。超出部分推迟到下一个 tick。 |
| `orchestrator_profile` | `""` | 拥有分解权的配置文件。空 = 回退到活动默认配置文件。 |
| `default_assignee` | `""` | LLM 选择未知配置文件时子任务的落地位置。空 = 回退到活动默认配置文件。 |
| `auto_subscribe_on_create` | `true` | 当 worker 在具有持久投递通道的会话(消息网关或 TUI内调用 `kanban_create` 时,原始会话会自动订阅新任务的完成/阻塞事件。调度器仍负责驱动投递 —— 此设置只决定调用者的聊天/密钥是否出现在通知订阅表中。设为 `false` 则要求对每个任务显式调用 `kanban_notify-subscribe`。 |
以及两个辅助 LLM 槽: