diff --git a/acp_adapter/server.py b/acp_adapter/server.py index 26d5809d1a4..fbdee70527a 100644 --- a/acp_adapter/server.py +++ b/acp_adapter/server.py @@ -1407,9 +1407,10 @@ class HermesACPAgent(acp.Agent): previous_approval_cb = None previous_interactive = None edit_approval_token = None + previous_session_id = None def _run_agent() -> dict: - nonlocal previous_approval_cb, previous_interactive, edit_approval_token + nonlocal previous_approval_cb, previous_interactive, edit_approval_token, previous_session_id # Bind HERMES_SESSION_KEY for this session so per-session caches # (e.g. the interactive sudo password cache in tools.terminal_tool) # scope to the ACP session rather than leaking across sessions @@ -1444,6 +1445,13 @@ class HermesACPAgent(acp.Agent): # and the non-interactive auto-approve path must not fire. previous_interactive = os.environ.get("HERMES_INTERACTIVE") os.environ["HERMES_INTERACTIVE"] = "1" + # Propagate the originating ACP session id to tools that want to + # tag side-effects with it (e.g. ``kanban_create`` stamps it on + # the new task so clients can render a per-session board). Save + # and restore around the agent call so a re-used executor thread + # never leaks one session's id into the next session's tools. + previous_session_id = os.environ.get("HERMES_SESSION_ID") + os.environ["HERMES_SESSION_ID"] = session_id try: result = agent.run_conversation( user_message=user_content, @@ -1461,6 +1469,11 @@ class HermesACPAgent(acp.Agent): os.environ.pop("HERMES_INTERACTIVE", None) else: os.environ["HERMES_INTERACTIVE"] = previous_interactive + # Restore HERMES_SESSION_ID symmetrically. + if previous_session_id is None: + os.environ.pop("HERMES_SESSION_ID", None) + else: + os.environ["HERMES_SESSION_ID"] = previous_session_id if approval_cb: try: from tools import terminal_tool as _terminal_tool diff --git a/hermes_cli/kanban.py b/hermes_cli/kanban.py index 2c2e8d3e5e7..01bdd3d8574 100644 --- a/hermes_cli/kanban.py +++ b/hermes_cli/kanban.py @@ -73,6 +73,7 @@ def _task_to_dict(t: kb.Task) -> dict[str, Any]: "result": t.result, "skills": list(t.skills) if t.skills else [], "max_retries": t.max_retries, + "session_id": t.session_id, } @@ -343,6 +344,9 @@ def build_parser(parent_subparsers: argparse._SubParsersAction) -> argparse.Argu p_list.add_argument("--status", default=None, choices=sorted(kb.VALID_STATUSES)) p_list.add_argument("--tenant", default=None) + p_list.add_argument("--session", default=None, + help="Filter by originating chat/agent session id " + "(set on tasks created from inside an ACP loop)") p_list.add_argument("--archived", action="store_true", help="Include archived tasks") p_list.add_argument("--json", action="store_true") @@ -1279,6 +1283,7 @@ def _cmd_list(args: argparse.Namespace) -> int: assignee=assignee, status=args.status, tenant=args.tenant, + session_id=args.session, include_archived=args.archived, order_by=getattr(args, "sort", None), ) diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index 7b8137ed25e..02d4be9728a 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -649,6 +649,12 @@ class Task: # ``kanban.failure_limit`` config, and then to ``DEFAULT_FAILURE_LIMIT``. # Name matches the ``--max-retries`` CLI flag on ``kanban create``. max_retries: Optional[int] = None + # Originating chat/agent session id, when the task was created from + # within an agent loop that propagated ``HERMES_SESSION_ID``. NULL for + # tasks created from the CLI, the dashboard, or any path that doesn't + # set the env var. Lets clients render a per-session board without + # relying on tenant + time-window heuristics. + session_id: Optional[str] = None @classmethod def from_row(cls, row: sqlite3.Row) -> "Task": @@ -714,6 +720,9 @@ class Task: max_retries=( row["max_retries"] if "max_retries" in keys else None ), + session_id=( + row["session_id"] if "session_id" in keys else None + ), ) @@ -844,9 +853,17 @@ CREATE TABLE IF NOT EXISTS tasks ( -- ``max_retries=1`` blocks on the first failure. NULL (the common -- case) falls through to the dispatcher-level ``kanban.failure_limit`` -- config and then ``DEFAULT_FAILURE_LIMIT``. - max_retries INTEGER + max_retries INTEGER, + -- Originating chat/agent session id when the task was created from + -- inside an agent loop that propagated ``HERMES_SESSION_ID``. NULL + -- for tasks created from the CLI, dashboard, or any path that doesn't + -- set the env var. Indexed so per-session list queries stay cheap on + -- larger boards. + session_id TEXT ); +CREATE INDEX IF NOT EXISTS idx_tasks_session_id ON tasks(session_id); + CREATE TABLE IF NOT EXISTS task_links ( parent_id TEXT NOT NULL, child_id TEXT NOT NULL, @@ -1143,6 +1160,20 @@ def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None: if "model_override" not in cols: conn.execute("ALTER TABLE tasks ADD COLUMN model_override TEXT") + if "session_id" not in cols: + # Originating agent/chat session id, populated when the task is + # created from within an agent loop that propagated + # ``HERMES_SESSION_ID`` (e.g. ACP). NULL on legacy rows and on any + # creation path that doesn't set the env var (CLI, dashboard). + # Index keeps per-session list queries cheap. + _add_column_if_missing( + conn, "tasks", "session_id", "session_id TEXT" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_tasks_session_id " + "ON tasks(session_id)" + ) + # task_events gained a run_id column; back-fill it as NULL for # historical events (they predate runs and can't be attributed). ev_cols = {row["name"] for row in conn.execute("PRAGMA table_info(task_events)")} @@ -1312,6 +1343,7 @@ def create_task( skills: Optional[Iterable[str]] = None, max_retries: Optional[int] = None, initial_status: str = "running", + session_id: Optional[str] = None, board: Optional[str] = None, ) -> str: """Create a new task and optionally link it under parent tasks. @@ -1466,8 +1498,8 @@ def create_task( id, title, body, assignee, status, priority, created_by, created_at, workspace_kind, workspace_path, tenant, idempotency_key, max_runtime_seconds, skills, - max_retries - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + max_retries, session_id + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( task_id, @@ -1485,6 +1517,7 @@ def create_task( int(max_runtime_seconds) if max_runtime_seconds is not None else None, json.dumps(skills_list) if skills_list is not None else None, int(max_retries) if max_retries is not None else None, + session_id, ), ) for pid in parents: @@ -1551,6 +1584,7 @@ def list_tasks( assignee: Optional[str] = None, status: Optional[str] = None, tenant: Optional[str] = None, + session_id: Optional[str] = None, include_archived: bool = False, limit: Optional[int] = None, order_by: Optional[str] = None, @@ -1568,6 +1602,9 @@ def list_tasks( if tenant is not None: query += " AND tenant = ?" params.append(tenant) + if session_id is not None: + query += " AND session_id = ?" + params.append(session_id) if not include_archived and status != "archived": query += " AND status != 'archived'" if order_by is not None: diff --git a/tests/acp/test_server.py b/tests/acp/test_server.py index a1c46a27f6a..c1ff1bf4e63 100644 --- a/tests/acp/test_server.py +++ b/tests/acp/test_server.py @@ -1090,6 +1090,80 @@ class TestPrompt: ] assert any(update.session_update == "agent_message_chunk" for update in updates) + @pytest.mark.asyncio + async def test_prompt_propagates_hermes_session_id_env(self, agent, monkeypatch): + """ACP must propagate the originating session id to the agent loop + via ``HERMES_SESSION_ID`` so tools that want to stamp side-effects + with it (e.g. ``kanban_create``) can read the env var inside + ``run_conversation``. The variable must be visible during the + agent call AND restored afterwards so a re-used executor thread + doesn't leak one session's id into another.""" + # Pre-condition: env is clean. + monkeypatch.delenv("HERMES_SESSION_ID", raising=False) + + new_resp = await agent.new_session(cwd=".") + state = agent.session_manager.get_session(new_resp.session_id) + + captured: dict[str, str | None] = {} + + def mock_run(user_message, conversation_history=None, task_id=None, **kwargs): + # Inside the agent loop the env var must reflect the active + # ACP session id. ``task_id`` is also the session id at this + # boundary; assert both for symmetry. + captured["env"] = os.environ.get("HERMES_SESSION_ID") + captured["task_id"] = task_id + return {"final_response": "ok", "messages": []} + + state.agent.run_conversation = mock_run + + mock_conn = MagicMock(spec=acp.Client) + mock_conn.session_update = AsyncMock() + agent._conn = mock_conn + + prompt = [TextContentBlock(type="text", text="hi")] + await agent.prompt(prompt=prompt, session_id=new_resp.session_id) + + assert captured["env"] == new_resp.session_id, ( + "HERMES_SESSION_ID must be set to the originating ACP session id " + "while the agent loop is running" + ) + assert captured["task_id"] == new_resp.session_id + # Post-condition: must be restored to the prior value (None here). + assert os.environ.get("HERMES_SESSION_ID") is None, ( + "HERMES_SESSION_ID must be restored after the agent call so " + "a re-used executor thread doesn't leak the id into the next " + "session's tools" + ) + + @pytest.mark.asyncio + async def test_prompt_restores_prior_hermes_session_id(self, agent, monkeypatch): + """If the env already had HERMES_SESSION_ID set (e.g. nested + agent loops), the prior value must be restored after the inner + prompt completes — not popped, not left at the inner id.""" + monkeypatch.setenv("HERMES_SESSION_ID", "outer-sess") + + new_resp = await agent.new_session(cwd=".") + state = agent.session_manager.get_session(new_resp.session_id) + + captured: dict[str, str | None] = {} + + def mock_run(*args, **kwargs): + captured["inner"] = os.environ.get("HERMES_SESSION_ID") + return {"final_response": "ok", "messages": []} + + state.agent.run_conversation = mock_run + + mock_conn = MagicMock(spec=acp.Client) + mock_conn.session_update = AsyncMock() + agent._conn = mock_conn + + prompt = [TextContentBlock(type="text", text="hi")] + await agent.prompt(prompt=prompt, session_id=new_resp.session_id) + + assert captured["inner"] == new_resp.session_id + # Outer scope must be restored. + assert os.environ.get("HERMES_SESSION_ID") == "outer-sess" + @pytest.mark.asyncio async def test_prompt_does_not_duplicate_streamed_final_message(self, agent): """If ACP already streamed response chunks, final_response should not be sent again.""" diff --git a/tests/hermes_cli/test_kanban_cli.py b/tests/hermes_cli/test_kanban_cli.py index ae0fdb768cd..e67bb94bdb7 100644 --- a/tests/hermes_cli/test_kanban_cli.py +++ b/tests/hermes_cli/test_kanban_cli.py @@ -156,6 +156,48 @@ def test_run_slash_tenant_filter(kanban_home): assert "biz-b task" in b and "biz-a task" not in b +def test_run_slash_session_filter(kanban_home): + """`hermes kanban list --session ` filters by the originating + chat session id stamped on tasks created from inside an ACP loop.""" + from hermes_cli import kanban_db as kb + with kb.connect() as conn: + kb.create_task( + conn, title="from sess-1 a", assignee="alice", session_id="sess-1" + ) + kb.create_task( + conn, title="from sess-1 b", assignee="alice", session_id="sess-1" + ) + kb.create_task( + conn, title="from sess-2", assignee="alice", session_id="sess-2" + ) + kb.create_task(conn, title="cli only", assignee="alice") + out_1 = kc.run_slash("list --session sess-1") + out_2 = kc.run_slash("list --session sess-2") + assert "from sess-1 a" in out_1 + assert "from sess-1 b" in out_1 + assert "from sess-2" not in out_1 + assert "cli only" not in out_1 + assert "from sess-2" in out_2 + assert "from sess-1 a" not in out_2 + + +def test_kanban_list_json_includes_session_id(kanban_home): + """JSON output exposes `session_id` so external clients (Scarf, web + dashboards) don't need a side query to filter by chat session.""" + from hermes_cli import kanban_db as kb + with kb.connect() as conn: + kb.create_task( + conn, title="acp task", assignee="alice", session_id="acp-x" + ) + raw = kc.run_slash("list --json") + payload = json.loads(raw) + assert any( + row.get("title") == "acp task" + and row.get("session_id") == "acp-x" + for row in payload + ) + + def test_run_slash_usage_error_returns_message(kanban_home): # Missing required argument for create out = kc.run_slash("create") diff --git a/tests/hermes_cli/test_kanban_db.py b/tests/hermes_cli/test_kanban_db.py index a90e2225320..a25a768689b 100644 --- a/tests/hermes_cli/test_kanban_db.py +++ b/tests/hermes_cli/test_kanban_db.py @@ -1035,6 +1035,76 @@ def test_tenant_propagates_to_events(kanban_home): assert created and created[0].payload.get("tenant") == "biz-a" +# --------------------------------------------------------------------------- +# Originating session id (ACP propagation) +# --------------------------------------------------------------------------- + +def test_create_task_stamps_session_id(kanban_home): + with kb.connect() as conn: + tid = kb.create_task( + conn, title="from chat", session_id="acp-sess-123" + ) + t = kb.get_task(conn, tid) + assert t is not None + assert t.session_id == "acp-sess-123" + + +def test_create_task_session_id_defaults_to_none(kanban_home): + with kb.connect() as conn: + tid = kb.create_task(conn, title="cli-created") + t = kb.get_task(conn, tid) + assert t is not None + assert t.session_id is None + + +def test_session_id_filters_listings(kanban_home): + with kb.connect() as conn: + kb.create_task(conn, title="s1-a", session_id="sess-1") + kb.create_task(conn, title="s1-b", session_id="sess-1") + kb.create_task(conn, title="s2-a", session_id="sess-2") + kb.create_task(conn, title="cli-only") # no session + sess1 = kb.list_tasks(conn, session_id="sess-1") + sess2 = kb.list_tasks(conn, session_id="sess-2") + unscoped = kb.list_tasks(conn) + assert sorted(t.title for t in sess1) == ["s1-a", "s1-b"] + assert [t.title for t in sess2] == ["s2-a"] + # Unscoped list still returns everything (legacy NULL rows visible). + assert len(unscoped) == 4 + + +def test_session_id_index_exists(kanban_home): + """The migration creates an index on session_id for cheap per-session + list queries on busy boards. Without it, a chat-scoped poll would + full-scan the tasks table.""" + with kb.connect() as conn: + rows = conn.execute( + "SELECT name FROM sqlite_master WHERE type='index' " + "AND tbl_name='tasks'" + ).fetchall() + names = {r["name"] for r in rows} + assert "idx_tasks_session_id" in names + + +def test_session_id_compose_with_tenant_filter(kanban_home): + """A client may want both `tenant=scarf:foo` AND `session=acp-x` — + the filters must AND, not replace.""" + with kb.connect() as conn: + kb.create_task( + conn, title="match", tenant="scarf:foo", session_id="acp-x" + ) + kb.create_task( + conn, title="wrong-tenant", tenant="other", session_id="acp-x" + ) + kb.create_task( + conn, title="wrong-session", + tenant="scarf:foo", session_id="acp-y", + ) + rows = kb.list_tasks( + conn, tenant="scarf:foo", session_id="acp-x" + ) + assert [t.title for t in rows] == ["match"] + + # --------------------------------------------------------------------------- # Shared-board path resolution (issue #19348) # @@ -1557,7 +1627,8 @@ def test_migrate_add_optional_columns_tolerates_concurrent_migration(kanban_home workflow_template_id TEXT, current_step_key TEXT, skills TEXT, - max_retries INTEGER + max_retries INTEGER, + session_id TEXT ) """ ) diff --git a/tests/tools/test_kanban_tools.py b/tests/tools/test_kanban_tools.py index 8159dfacf44..b654e434d68 100644 --- a/tests/tools/test_kanban_tools.py +++ b/tests/tools/test_kanban_tools.py @@ -768,6 +768,75 @@ def test_create_happy_path(worker_env): conn.close() +def test_create_stamps_session_id_from_env(monkeypatch, worker_env): + """When the agent loop runs under ACP, the server propagates the + originating chat session id via HERMES_SESSION_ID. ``kanban_create`` + reads it and stamps the new task so clients can render a per-session + board (issue: ACP session linkage on kanban tasks).""" + monkeypatch.setenv("HERMES_SESSION_ID", "acp-sess-abc") + from tools import kanban_tools as kt + from hermes_cli import kanban_db as kb + out = kt._handle_create({ + "title": "from chat", + "assignee": "peer", + "parents": [worker_env], + }) + d = json.loads(out) + assert d["ok"] is True + conn = kb.connect() + try: + new_task = kb.get_task(conn, d["task_id"]) + assert new_task.session_id == "acp-sess-abc" + finally: + conn.close() + + +def test_create_session_id_arg_overrides_env(monkeypatch, worker_env): + """An explicit ``session_id`` arg from the model wins over the env + propagation. Edge case but exercised: a tool call could carry a + different session id (e.g. cross-session linking) and the explicit + arg should not be silently overwritten.""" + monkeypatch.setenv("HERMES_SESSION_ID", "from-env") + from tools import kanban_tools as kt + from hermes_cli import kanban_db as kb + out = kt._handle_create({ + "title": "explicit override", + "assignee": "peer", + "parents": [worker_env], + "session_id": "explicit-arg", + }) + d = json.loads(out) + assert d["ok"] is True + conn = kb.connect() + try: + new_task = kb.get_task(conn, d["task_id"]) + assert new_task.session_id == "explicit-arg" + finally: + conn.close() + + +def test_create_session_id_absent_when_env_unset(monkeypatch, worker_env): + """No env var, no arg → session_id stays NULL. Important for backwards + compatibility: pre-ACP-propagation hosts and CLI-driven creates must + not accidentally inherit a stale id.""" + monkeypatch.delenv("HERMES_SESSION_ID", raising=False) + from tools import kanban_tools as kt + from hermes_cli import kanban_db as kb + out = kt._handle_create({ + "title": "no session", + "assignee": "peer", + "parents": [worker_env], + }) + d = json.loads(out) + assert d["ok"] is True + conn = kb.connect() + try: + new_task = kb.get_task(conn, d["task_id"]) + assert new_task.session_id is None + finally: + conn.close() + + def test_create_rejects_no_title(worker_env): from tools import kanban_tools as kt assert json.loads(kt._handle_create({"assignee": "x"})).get("error") diff --git a/tools/kanban_tools.py b/tools/kanban_tools.py index d80577abd8e..29b5618e681 100644 --- a/tools/kanban_tools.py +++ b/tools/kanban_tools.py @@ -654,6 +654,10 @@ def _handle_create(args: dict, **kw) -> str: body = args.get("body") parents = args.get("parents") or [] tenant = args.get("tenant") or os.environ.get("HERMES_TENANT") + # Stamp the originating session id when the agent loop runs under + # ACP (which sets HERMES_SESSION_ID before invoking tools). NULL on + # CLI / dashboard paths and on legacy hosts that don't set the env. + session_id = args.get("session_id") or os.environ.get("HERMES_SESSION_ID") priority = args.get("priority") workspace_kind = args.get("workspace_kind") or "scratch" workspace_path = args.get("workspace_path") @@ -700,6 +704,7 @@ def _handle_create(args: dict, **kw) -> str: skills=skills, initial_status=str(initial_status), created_by=os.environ.get("HERMES_PROFILE") or "worker", + session_id=session_id, ) new_task = kb.get_task(conn, new_tid) return _ok(