diff --git a/acp_adapter/provenance.py b/acp_adapter/provenance.py new file mode 100644 index 00000000000..58b05daf5af --- /dev/null +++ b/acp_adapter/provenance.py @@ -0,0 +1,127 @@ +"""Derive ACP session-provenance metadata from the existing compression chain. + +This is an additive Hermes extension surfaced under ACP ``_meta.hermes`` so +existing ACP clients ignore it. It carries no new persisted state: everything +is derived on demand from the ``sessions`` table (``parent_session_id`` / +``end_reason``), which already models compression-continuation chains. + +The ACP/editor ``session_id`` stays the stable public handle. When context +compression rotates the internal Hermes head, ``build_session_provenance`` lets +a client see the previous/current internal ids and the lineage root without +parsing status text, guessing from token drops, or reading ``state.db``. +""" + +from __future__ import annotations + +from typing import Any, Dict, Optional + +# Bound defensive walks; compression chains this deep are pathological. +_MAX_WALK = 100 + + +def build_session_provenance( + db: Any, + acp_session_id: str, + current_hermes_session_id: str, + *, + previous_hermes_session_id: Optional[str] = None, +) -> Optional[Dict[str, Any]]: + """Build ``_meta.hermes.sessionProvenance`` for an ACP session. + + Args: + db: A ``SessionDB`` (must expose ``get_session``). + acp_session_id: The stable ACP/editor-facing session handle. + current_hermes_session_id: The live internal Hermes DB session id + (``state.agent.session_id``). + previous_hermes_session_id: The internal id from before the most recent + turn, when known. Supplied by ``prompt()`` to flag a rotation. + + Returns: + A dict suitable for ``{"hermes": {"sessionProvenance": }}`` under + ACP ``_meta``, or ``None`` if the session can't be read. + """ + try: + row = db.get_session(current_hermes_session_id) + except Exception: + return None + if not row: + return None + + parent_id = row.get("parent_session_id") + end_reason = row.get("end_reason") + + # Walk parents to the lineage root and count compression depth. Only + # compression-split parents (parent.end_reason == 'compression') count + # toward depth — delegate/branch children share the parent_session_id + # column but are not compaction boundaries. + root_id = current_hermes_session_id + compression_depth = 0 + cursor_parent = parent_id + seen = {current_hermes_session_id} + for _ in range(_MAX_WALK): + if not cursor_parent or cursor_parent in seen: + break + seen.add(cursor_parent) + try: + prow = db.get_session(cursor_parent) + except Exception: + prow = None + if not prow: + break + root_id = cursor_parent + if prow.get("end_reason") == "compression": + compression_depth += 1 + cursor_parent = prow.get("parent_session_id") + + # A session is a compression continuation when its parent was ended with + # end_reason='compression'. Determine that from the immediate parent. + is_continuation = False + if parent_id: + try: + immediate_parent = db.get_session(parent_id) + except Exception: + immediate_parent = None + if immediate_parent and immediate_parent.get("end_reason") == "compression": + is_continuation = True + + rotated = bool( + previous_hermes_session_id + and previous_hermes_session_id != current_hermes_session_id + ) + + provenance: Dict[str, Any] = { + "acpSessionId": acp_session_id, + "currentHermesSessionId": current_hermes_session_id, + "rootHermesSessionId": root_id, + "parentHermesSessionId": parent_id, + "sessionKind": "continuation" if is_continuation else "root", + "compressionDepth": compression_depth, + } + if previous_hermes_session_id: + provenance["previousHermesSessionId"] = previous_hermes_session_id + if rotated: + # The head moved during the last turn. The only mechanism that rotates + # the internal id mid-turn is compression-driven session splitting. + provenance["reason"] = "compression" + provenance["creatorKind"] = "compression" + + return provenance + + +def session_provenance_meta( + db: Any, + acp_session_id: str, + current_hermes_session_id: str, + *, + previous_hermes_session_id: Optional[str] = None, +) -> Optional[Dict[str, Any]]: + """Return a ready ``_meta`` payload: ``{"hermes": {"sessionProvenance": ...}}``.""" + prov = build_session_provenance( + db, + acp_session_id, + current_hermes_session_id, + previous_hermes_session_id=previous_hermes_session_id, + ) + if prov is None: + return None + return {"hermes": {"sessionProvenance": prov}} diff --git a/acp_adapter/server.py b/acp_adapter/server.py index b4195af87d8..6901fe28e88 100644 --- a/acp_adapter/server.py +++ b/acp_adapter/server.py @@ -71,6 +71,7 @@ from acp_adapter.events import ( make_tool_progress_cb, ) from acp_adapter.permissions import make_approval_callback +from acp_adapter.provenance import session_provenance_meta from acp_adapter.session import SessionManager, SessionState, _expand_acp_enabled_toolsets from acp_adapter.tools import build_tool_complete, build_tool_start @@ -709,8 +710,39 @@ class HermesACPAgent(acp.Agent): exc_info=True, ) - async def _send_session_info_update(self, session_id: str) -> None: - """Send ACP native session metadata after Hermes changes it.""" + def _provenance_meta( + self, + acp_session_id: str, + current_hermes_session_id: str, + previous_hermes_session_id: Optional[str] = None, + ) -> Optional[dict]: + """Best-effort ``_meta.hermes.sessionProvenance`` for an ACP session.""" + try: + return session_provenance_meta( + self.session_manager._get_db(), + acp_session_id, + current_hermes_session_id, + previous_hermes_session_id=previous_hermes_session_id, + ) + except Exception: + logger.debug( + "Could not build ACP session provenance for %s", acp_session_id, exc_info=True + ) + return None + + async def _send_session_info_update( + self, + session_id: str, + *, + current_hermes_session_id: Optional[str] = None, + previous_hermes_session_id: Optional[str] = None, + ) -> None: + """Send ACP native session metadata after Hermes changes it. + + When the internal Hermes head rotated (e.g. compression-driven session + split during a turn), pass ``previous_hermes_session_id`` so the + attached ``_meta.hermes.sessionProvenance`` flags the rotation reason. + """ if not self._conn: return try: @@ -727,10 +759,16 @@ class HermesACPAgent(acp.Agent): # the updated_at since we're emitting this notification precisely # because the title was just refreshed. updated_at = datetime.now(timezone.utc).isoformat() + meta = self._provenance_meta( + session_id, + current_hermes_session_id or session_id, + previous_hermes_session_id, + ) update = SessionInfoUpdate( session_update="session_info_update", title=title if isinstance(title, str) and title.strip() else None, updated_at=updated_at, + field_meta=meta, ) try: await self._conn.session_update( @@ -1081,6 +1119,9 @@ class HermesACPAgent(acp.Agent): session_id=state.session_id, models=self._build_model_state(state), modes=self._session_modes(state), + field_meta=self._provenance_meta( + state.session_id, getattr(state.agent, "session_id", state.session_id) + ), ) async def load_session( @@ -1125,6 +1166,9 @@ class HermesACPAgent(acp.Agent): return LoadSessionResponse( models=self._build_model_state(state), modes=self._session_modes(state), + field_meta=self._provenance_meta( + session_id, getattr(state.agent, "session_id", session_id) + ), ) async def resume_session( @@ -1157,6 +1201,9 @@ class HermesACPAgent(acp.Agent): return ResumeSessionResponse( models=self._build_model_state(state), modes=self._session_modes(state), + field_meta=self._provenance_meta( + state.session_id, getattr(state.agent, "session_id", state.session_id) + ), ) async def cancel(self, session_id: str, **kwargs: Any) -> None: @@ -1494,6 +1541,11 @@ class HermesACPAgent(acp.Agent): logger.debug("Could not clear ACP session context", exc_info=True) try: + # Snapshot the internal Hermes DB session id before the turn so we + # can detect a compression-driven session rotation afterwards. The + # ACP `session_id` stays the stable client handle; agent.session_id + # is the live internal head that compression may rotate. + pre_turn_hermes_id = getattr(state.agent, "session_id", None) # Wrap the executor call in a fresh copy of the current context so # concurrent ACP sessions on the shared ThreadPoolExecutor don't # stomp on each other's ContextVar writes (HERMES_SESSION_KEY in @@ -1512,6 +1564,30 @@ class HermesACPAgent(acp.Agent): # Persist updated history so sessions survive process restarts. self.session_manager.save_session(session_id) + # Detect a compression-driven internal session rotation. If the agent's + # DB head moved during the turn, emit a session_info_update carrying + # _meta.hermes.sessionProvenance so ACP clients can render the boundary + # and keep old/new ids in lineage. The ACP session_id is unchanged. + post_turn_hermes_id = getattr(state.agent, "session_id", None) + if ( + conn + and post_turn_hermes_id + and pre_turn_hermes_id + and post_turn_hermes_id != pre_turn_hermes_id + ): + try: + await self._send_session_info_update( + session_id, + current_hermes_session_id=post_turn_hermes_id, + previous_hermes_session_id=pre_turn_hermes_id, + ) + except Exception: + logger.debug( + "Could not emit ACP provenance update after rotation for %s", + session_id, + exc_info=True, + ) + final_response = result.get("final_response", "") cancelled = bool(state.cancel_event and state.cancel_event.is_set()) interrupted = bool(result.get("interrupted")) or cancelled diff --git a/tests/acp/test_session_provenance.py b/tests/acp/test_session_provenance.py new file mode 100644 index 00000000000..b1d80907cf5 --- /dev/null +++ b/tests/acp/test_session_provenance.py @@ -0,0 +1,103 @@ +"""Tests for ACP session-provenance derivation (issue #33617). + +Exercises acp_adapter.provenance against a real SessionDB — no mocks — covering +the acceptance-criteria matrix: root session, compression-split continuation, +multi-depth chains, rotation flagging, and graceful handling of unknown ids. +""" + +import time + +import pytest + +from acp_adapter.provenance import build_session_provenance, session_provenance_meta +from hermes_state import SessionDB + + +@pytest.fixture() +def db(tmp_path): + d = SessionDB(db_path=tmp_path / "state.db") + yield d + + +def _mk(db, sid, parent=None): + db.create_session(session_id=sid, source="acp", parent_session_id=parent) + + +def test_root_session_no_compression(db): + _mk(db, "root1") + prov = build_session_provenance(db, "acp-1", "root1") + assert prov["acpSessionId"] == "acp-1" + assert prov["currentHermesSessionId"] == "root1" + assert prov["rootHermesSessionId"] == "root1" + assert prov["parentHermesSessionId"] is None + assert prov["sessionKind"] == "root" + assert prov["compressionDepth"] == 0 + assert "reason" not in prov # no rotation signalled + + +def test_compression_split_continuation(db): + # Parent ended with compression, child created afterwards. + _mk(db, "old") + db.end_session("old", "compression") + time.sleep(0.001) + _mk(db, "new", parent="old") + + prov = build_session_provenance( + db, "acp-1", "new", previous_hermes_session_id="old" + ) + assert prov["sessionKind"] == "continuation" + assert prov["parentHermesSessionId"] == "old" + assert prov["rootHermesSessionId"] == "old" + assert prov["compressionDepth"] == 1 + assert prov["previousHermesSessionId"] == "old" + # Head rotated this turn → reason/creatorKind flagged. + assert prov["reason"] == "compression" + assert prov["creatorKind"] == "compression" + + +def test_multi_depth_chain(db): + _mk(db, "s0") + db.end_session("s0", "compression") + _mk(db, "s1", parent="s0") + db.end_session("s1", "compression") + _mk(db, "s2", parent="s1") + + prov = build_session_provenance(db, "acp-1", "s2") + assert prov["rootHermesSessionId"] == "s0" + assert prov["compressionDepth"] == 2 + assert prov["sessionKind"] == "continuation" + + +def test_non_compression_parent_is_root_not_continuation(db): + # A child with a parent that did NOT end via compression (e.g. delegate + # or branch child) must not be reported as a compression continuation. + _mk(db, "p") + _mk(db, "c", parent="p") # parent still live, no end_reason + prov = build_session_provenance(db, "acp-1", "c") + assert prov["sessionKind"] == "root" + assert prov["compressionDepth"] == 0 + assert prov["rootHermesSessionId"] == "p" # lineage root still walked + + +def test_no_false_rotation_when_head_unchanged(db): + _mk(db, "s") + # previous == current → no rotation reason emitted. + prov = build_session_provenance( + db, "acp-1", "s", previous_hermes_session_id="s" + ) + assert "reason" not in prov + assert "creatorKind" not in prov + assert prov["previousHermesSessionId"] == "s" + + +def test_unknown_session_returns_none(db): + assert build_session_provenance(db, "acp-1", "does-not-exist") is None + assert session_provenance_meta(db, "acp-1", "does-not-exist") is None + + +def test_meta_wrapper_shape(db): + _mk(db, "root1") + meta = session_provenance_meta(db, "acp-1", "root1") + assert set(meta.keys()) == {"hermes"} + assert "sessionProvenance" in meta["hermes"] + assert meta["hermes"]["sessionProvenance"]["currentHermesSessionId"] == "root1"