feat(kanban): stamp originating ACP session_id on tasks

Salvages #23208 by @awizemann. Tracks which chat session created a
kanban task so clients can render a per-session board without falling
back to tenant + time-window heuristics.

- Schema: tasks gains nullable session_id TEXT column with index
  (additive migration in _migrate_add_optional_columns).
- ACP: server.py exposes the originating session id via HERMES_SESSION_ID
  with save/restore around the agent loop.
- Tool: kanban_create reads HERMES_SESSION_ID (with explicit override).
- CLI: 'hermes kanban list --session <id>' filter; JSON output exposes
  session_id.
This commit is contained in:
awizemann 2026-05-18 21:15:15 -07:00 committed by Teknium
parent 8e193cf05c
commit 31fe229039
8 changed files with 321 additions and 5 deletions

View file

@ -1407,9 +1407,10 @@ class HermesACPAgent(acp.Agent):
previous_approval_cb = None
previous_interactive = None
edit_approval_token = None
previous_session_id = None
def _run_agent() -> dict:
nonlocal previous_approval_cb, previous_interactive, edit_approval_token
nonlocal previous_approval_cb, previous_interactive, edit_approval_token, previous_session_id
# Bind HERMES_SESSION_KEY for this session so per-session caches
# (e.g. the interactive sudo password cache in tools.terminal_tool)
# scope to the ACP session rather than leaking across sessions
@ -1444,6 +1445,13 @@ class HermesACPAgent(acp.Agent):
# and the non-interactive auto-approve path must not fire.
previous_interactive = os.environ.get("HERMES_INTERACTIVE")
os.environ["HERMES_INTERACTIVE"] = "1"
# Propagate the originating ACP session id to tools that want to
# tag side-effects with it (e.g. ``kanban_create`` stamps it on
# the new task so clients can render a per-session board). Save
# and restore around the agent call so a re-used executor thread
# never leaks one session's id into the next session's tools.
previous_session_id = os.environ.get("HERMES_SESSION_ID")
os.environ["HERMES_SESSION_ID"] = session_id
try:
result = agent.run_conversation(
user_message=user_content,
@ -1461,6 +1469,11 @@ class HermesACPAgent(acp.Agent):
os.environ.pop("HERMES_INTERACTIVE", None)
else:
os.environ["HERMES_INTERACTIVE"] = previous_interactive
# Restore HERMES_SESSION_ID symmetrically.
if previous_session_id is None:
os.environ.pop("HERMES_SESSION_ID", None)
else:
os.environ["HERMES_SESSION_ID"] = previous_session_id
if approval_cb:
try:
from tools import terminal_tool as _terminal_tool

View file

@ -73,6 +73,7 @@ def _task_to_dict(t: kb.Task) -> dict[str, Any]:
"result": t.result,
"skills": list(t.skills) if t.skills else [],
"max_retries": t.max_retries,
"session_id": t.session_id,
}
@ -343,6 +344,9 @@ def build_parser(parent_subparsers: argparse._SubParsersAction) -> argparse.Argu
p_list.add_argument("--status", default=None,
choices=sorted(kb.VALID_STATUSES))
p_list.add_argument("--tenant", default=None)
p_list.add_argument("--session", default=None,
help="Filter by originating chat/agent session id "
"(set on tasks created from inside an ACP loop)")
p_list.add_argument("--archived", action="store_true",
help="Include archived tasks")
p_list.add_argument("--json", action="store_true")
@ -1279,6 +1283,7 @@ def _cmd_list(args: argparse.Namespace) -> int:
assignee=assignee,
status=args.status,
tenant=args.tenant,
session_id=args.session,
include_archived=args.archived,
order_by=getattr(args, "sort", None),
)

View file

@ -649,6 +649,12 @@ class Task:
# ``kanban.failure_limit`` config, and then to ``DEFAULT_FAILURE_LIMIT``.
# Name matches the ``--max-retries`` CLI flag on ``kanban create``.
max_retries: Optional[int] = None
# Originating chat/agent session id, when the task was created from
# within an agent loop that propagated ``HERMES_SESSION_ID``. NULL for
# tasks created from the CLI, the dashboard, or any path that doesn't
# set the env var. Lets clients render a per-session board without
# relying on tenant + time-window heuristics.
session_id: Optional[str] = None
@classmethod
def from_row(cls, row: sqlite3.Row) -> "Task":
@ -714,6 +720,9 @@ class Task:
max_retries=(
row["max_retries"] if "max_retries" in keys else None
),
session_id=(
row["session_id"] if "session_id" in keys else None
),
)
@ -844,9 +853,17 @@ CREATE TABLE IF NOT EXISTS tasks (
-- ``max_retries=1`` blocks on the first failure. NULL (the common
-- case) falls through to the dispatcher-level ``kanban.failure_limit``
-- config and then ``DEFAULT_FAILURE_LIMIT``.
max_retries INTEGER
max_retries INTEGER,
-- Originating chat/agent session id when the task was created from
-- inside an agent loop that propagated ``HERMES_SESSION_ID``. NULL
-- for tasks created from the CLI, dashboard, or any path that doesn't
-- set the env var. Indexed so per-session list queries stay cheap on
-- larger boards.
session_id TEXT
);
CREATE INDEX IF NOT EXISTS idx_tasks_session_id ON tasks(session_id);
CREATE TABLE IF NOT EXISTS task_links (
parent_id TEXT NOT NULL,
child_id TEXT NOT NULL,
@ -1143,6 +1160,20 @@ def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None:
if "model_override" not in cols:
conn.execute("ALTER TABLE tasks ADD COLUMN model_override TEXT")
if "session_id" not in cols:
# Originating agent/chat session id, populated when the task is
# created from within an agent loop that propagated
# ``HERMES_SESSION_ID`` (e.g. ACP). NULL on legacy rows and on any
# creation path that doesn't set the env var (CLI, dashboard).
# Index keeps per-session list queries cheap.
_add_column_if_missing(
conn, "tasks", "session_id", "session_id TEXT"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_tasks_session_id "
"ON tasks(session_id)"
)
# task_events gained a run_id column; back-fill it as NULL for
# historical events (they predate runs and can't be attributed).
ev_cols = {row["name"] for row in conn.execute("PRAGMA table_info(task_events)")}
@ -1312,6 +1343,7 @@ def create_task(
skills: Optional[Iterable[str]] = None,
max_retries: Optional[int] = None,
initial_status: str = "running",
session_id: Optional[str] = None,
board: Optional[str] = None,
) -> str:
"""Create a new task and optionally link it under parent tasks.
@ -1466,8 +1498,8 @@ def create_task(
id, title, body, assignee, status, priority,
created_by, created_at, workspace_kind, workspace_path,
tenant, idempotency_key, max_runtime_seconds, skills,
max_retries
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
max_retries, session_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
task_id,
@ -1485,6 +1517,7 @@ def create_task(
int(max_runtime_seconds) if max_runtime_seconds is not None else None,
json.dumps(skills_list) if skills_list is not None else None,
int(max_retries) if max_retries is not None else None,
session_id,
),
)
for pid in parents:
@ -1551,6 +1584,7 @@ def list_tasks(
assignee: Optional[str] = None,
status: Optional[str] = None,
tenant: Optional[str] = None,
session_id: Optional[str] = None,
include_archived: bool = False,
limit: Optional[int] = None,
order_by: Optional[str] = None,
@ -1568,6 +1602,9 @@ def list_tasks(
if tenant is not None:
query += " AND tenant = ?"
params.append(tenant)
if session_id is not None:
query += " AND session_id = ?"
params.append(session_id)
if not include_archived and status != "archived":
query += " AND status != 'archived'"
if order_by is not None:

View file

@ -1090,6 +1090,80 @@ class TestPrompt:
]
assert any(update.session_update == "agent_message_chunk" for update in updates)
@pytest.mark.asyncio
async def test_prompt_propagates_hermes_session_id_env(self, agent, monkeypatch):
"""ACP must propagate the originating session id to the agent loop
via ``HERMES_SESSION_ID`` so tools that want to stamp side-effects
with it (e.g. ``kanban_create``) can read the env var inside
``run_conversation``. The variable must be visible during the
agent call AND restored afterwards so a re-used executor thread
doesn't leak one session's id into another."""
# Pre-condition: env is clean.
monkeypatch.delenv("HERMES_SESSION_ID", raising=False)
new_resp = await agent.new_session(cwd=".")
state = agent.session_manager.get_session(new_resp.session_id)
captured: dict[str, str | None] = {}
def mock_run(user_message, conversation_history=None, task_id=None, **kwargs):
# Inside the agent loop the env var must reflect the active
# ACP session id. ``task_id`` is also the session id at this
# boundary; assert both for symmetry.
captured["env"] = os.environ.get("HERMES_SESSION_ID")
captured["task_id"] = task_id
return {"final_response": "ok", "messages": []}
state.agent.run_conversation = mock_run
mock_conn = MagicMock(spec=acp.Client)
mock_conn.session_update = AsyncMock()
agent._conn = mock_conn
prompt = [TextContentBlock(type="text", text="hi")]
await agent.prompt(prompt=prompt, session_id=new_resp.session_id)
assert captured["env"] == new_resp.session_id, (
"HERMES_SESSION_ID must be set to the originating ACP session id "
"while the agent loop is running"
)
assert captured["task_id"] == new_resp.session_id
# Post-condition: must be restored to the prior value (None here).
assert os.environ.get("HERMES_SESSION_ID") is None, (
"HERMES_SESSION_ID must be restored after the agent call so "
"a re-used executor thread doesn't leak the id into the next "
"session's tools"
)
@pytest.mark.asyncio
async def test_prompt_restores_prior_hermes_session_id(self, agent, monkeypatch):
"""If the env already had HERMES_SESSION_ID set (e.g. nested
agent loops), the prior value must be restored after the inner
prompt completes not popped, not left at the inner id."""
monkeypatch.setenv("HERMES_SESSION_ID", "outer-sess")
new_resp = await agent.new_session(cwd=".")
state = agent.session_manager.get_session(new_resp.session_id)
captured: dict[str, str | None] = {}
def mock_run(*args, **kwargs):
captured["inner"] = os.environ.get("HERMES_SESSION_ID")
return {"final_response": "ok", "messages": []}
state.agent.run_conversation = mock_run
mock_conn = MagicMock(spec=acp.Client)
mock_conn.session_update = AsyncMock()
agent._conn = mock_conn
prompt = [TextContentBlock(type="text", text="hi")]
await agent.prompt(prompt=prompt, session_id=new_resp.session_id)
assert captured["inner"] == new_resp.session_id
# Outer scope must be restored.
assert os.environ.get("HERMES_SESSION_ID") == "outer-sess"
@pytest.mark.asyncio
async def test_prompt_does_not_duplicate_streamed_final_message(self, agent):
"""If ACP already streamed response chunks, final_response should not be sent again."""

View file

@ -156,6 +156,48 @@ def test_run_slash_tenant_filter(kanban_home):
assert "biz-b task" in b and "biz-a task" not in b
def test_run_slash_session_filter(kanban_home):
"""`hermes kanban list --session <id>` filters by the originating
chat session id stamped on tasks created from inside an ACP loop."""
from hermes_cli import kanban_db as kb
with kb.connect() as conn:
kb.create_task(
conn, title="from sess-1 a", assignee="alice", session_id="sess-1"
)
kb.create_task(
conn, title="from sess-1 b", assignee="alice", session_id="sess-1"
)
kb.create_task(
conn, title="from sess-2", assignee="alice", session_id="sess-2"
)
kb.create_task(conn, title="cli only", assignee="alice")
out_1 = kc.run_slash("list --session sess-1")
out_2 = kc.run_slash("list --session sess-2")
assert "from sess-1 a" in out_1
assert "from sess-1 b" in out_1
assert "from sess-2" not in out_1
assert "cli only" not in out_1
assert "from sess-2" in out_2
assert "from sess-1 a" not in out_2
def test_kanban_list_json_includes_session_id(kanban_home):
"""JSON output exposes `session_id` so external clients (Scarf, web
dashboards) don't need a side query to filter by chat session."""
from hermes_cli import kanban_db as kb
with kb.connect() as conn:
kb.create_task(
conn, title="acp task", assignee="alice", session_id="acp-x"
)
raw = kc.run_slash("list --json")
payload = json.loads(raw)
assert any(
row.get("title") == "acp task"
and row.get("session_id") == "acp-x"
for row in payload
)
def test_run_slash_usage_error_returns_message(kanban_home):
# Missing required argument for create
out = kc.run_slash("create")

View file

@ -1035,6 +1035,76 @@ def test_tenant_propagates_to_events(kanban_home):
assert created and created[0].payload.get("tenant") == "biz-a"
# ---------------------------------------------------------------------------
# Originating session id (ACP propagation)
# ---------------------------------------------------------------------------
def test_create_task_stamps_session_id(kanban_home):
with kb.connect() as conn:
tid = kb.create_task(
conn, title="from chat", session_id="acp-sess-123"
)
t = kb.get_task(conn, tid)
assert t is not None
assert t.session_id == "acp-sess-123"
def test_create_task_session_id_defaults_to_none(kanban_home):
with kb.connect() as conn:
tid = kb.create_task(conn, title="cli-created")
t = kb.get_task(conn, tid)
assert t is not None
assert t.session_id is None
def test_session_id_filters_listings(kanban_home):
with kb.connect() as conn:
kb.create_task(conn, title="s1-a", session_id="sess-1")
kb.create_task(conn, title="s1-b", session_id="sess-1")
kb.create_task(conn, title="s2-a", session_id="sess-2")
kb.create_task(conn, title="cli-only") # no session
sess1 = kb.list_tasks(conn, session_id="sess-1")
sess2 = kb.list_tasks(conn, session_id="sess-2")
unscoped = kb.list_tasks(conn)
assert sorted(t.title for t in sess1) == ["s1-a", "s1-b"]
assert [t.title for t in sess2] == ["s2-a"]
# Unscoped list still returns everything (legacy NULL rows visible).
assert len(unscoped) == 4
def test_session_id_index_exists(kanban_home):
"""The migration creates an index on session_id for cheap per-session
list queries on busy boards. Without it, a chat-scoped poll would
full-scan the tasks table."""
with kb.connect() as conn:
rows = conn.execute(
"SELECT name FROM sqlite_master WHERE type='index' "
"AND tbl_name='tasks'"
).fetchall()
names = {r["name"] for r in rows}
assert "idx_tasks_session_id" in names
def test_session_id_compose_with_tenant_filter(kanban_home):
"""A client may want both `tenant=scarf:foo` AND `session=acp-x` —
the filters must AND, not replace."""
with kb.connect() as conn:
kb.create_task(
conn, title="match", tenant="scarf:foo", session_id="acp-x"
)
kb.create_task(
conn, title="wrong-tenant", tenant="other", session_id="acp-x"
)
kb.create_task(
conn, title="wrong-session",
tenant="scarf:foo", session_id="acp-y",
)
rows = kb.list_tasks(
conn, tenant="scarf:foo", session_id="acp-x"
)
assert [t.title for t in rows] == ["match"]
# ---------------------------------------------------------------------------
# Shared-board path resolution (issue #19348)
#
@ -1557,7 +1627,8 @@ def test_migrate_add_optional_columns_tolerates_concurrent_migration(kanban_home
workflow_template_id TEXT,
current_step_key TEXT,
skills TEXT,
max_retries INTEGER
max_retries INTEGER,
session_id TEXT
)
"""
)

View file

@ -768,6 +768,75 @@ def test_create_happy_path(worker_env):
conn.close()
def test_create_stamps_session_id_from_env(monkeypatch, worker_env):
"""When the agent loop runs under ACP, the server propagates the
originating chat session id via HERMES_SESSION_ID. ``kanban_create``
reads it and stamps the new task so clients can render a per-session
board (issue: ACP session linkage on kanban tasks)."""
monkeypatch.setenv("HERMES_SESSION_ID", "acp-sess-abc")
from tools import kanban_tools as kt
from hermes_cli import kanban_db as kb
out = kt._handle_create({
"title": "from chat",
"assignee": "peer",
"parents": [worker_env],
})
d = json.loads(out)
assert d["ok"] is True
conn = kb.connect()
try:
new_task = kb.get_task(conn, d["task_id"])
assert new_task.session_id == "acp-sess-abc"
finally:
conn.close()
def test_create_session_id_arg_overrides_env(monkeypatch, worker_env):
"""An explicit ``session_id`` arg from the model wins over the env
propagation. Edge case but exercised: a tool call could carry a
different session id (e.g. cross-session linking) and the explicit
arg should not be silently overwritten."""
monkeypatch.setenv("HERMES_SESSION_ID", "from-env")
from tools import kanban_tools as kt
from hermes_cli import kanban_db as kb
out = kt._handle_create({
"title": "explicit override",
"assignee": "peer",
"parents": [worker_env],
"session_id": "explicit-arg",
})
d = json.loads(out)
assert d["ok"] is True
conn = kb.connect()
try:
new_task = kb.get_task(conn, d["task_id"])
assert new_task.session_id == "explicit-arg"
finally:
conn.close()
def test_create_session_id_absent_when_env_unset(monkeypatch, worker_env):
"""No env var, no arg → session_id stays NULL. Important for backwards
compatibility: pre-ACP-propagation hosts and CLI-driven creates must
not accidentally inherit a stale id."""
monkeypatch.delenv("HERMES_SESSION_ID", raising=False)
from tools import kanban_tools as kt
from hermes_cli import kanban_db as kb
out = kt._handle_create({
"title": "no session",
"assignee": "peer",
"parents": [worker_env],
})
d = json.loads(out)
assert d["ok"] is True
conn = kb.connect()
try:
new_task = kb.get_task(conn, d["task_id"])
assert new_task.session_id is None
finally:
conn.close()
def test_create_rejects_no_title(worker_env):
from tools import kanban_tools as kt
assert json.loads(kt._handle_create({"assignee": "x"})).get("error")

View file

@ -654,6 +654,10 @@ def _handle_create(args: dict, **kw) -> str:
body = args.get("body")
parents = args.get("parents") or []
tenant = args.get("tenant") or os.environ.get("HERMES_TENANT")
# Stamp the originating session id when the agent loop runs under
# ACP (which sets HERMES_SESSION_ID before invoking tools). NULL on
# CLI / dashboard paths and on legacy hosts that don't set the env.
session_id = args.get("session_id") or os.environ.get("HERMES_SESSION_ID")
priority = args.get("priority")
workspace_kind = args.get("workspace_kind") or "scratch"
workspace_path = args.get("workspace_path")
@ -700,6 +704,7 @@ def _handle_create(args: dict, **kw) -> str:
skills=skills,
initial_status=str(initial_status),
created_by=os.environ.get("HERMES_PROFILE") or "worker",
session_id=session_id,
)
new_task = kb.get_task(conn, new_tid)
return _ok(