feat(acp): emit session provenance metadata for compression rotation (#41724)

Closes #33617. Adds additive _meta.hermes.sessionProvenance to ACP session
surfaces so clients can detect compression-driven internal session rotation
without parsing status text, guessing from token drops, or reading state.db.

Derived on demand from the existing compression chain (parent_session_id /
end_reason) — no new persisted state, no schema change, no ACP protocol change.
ACP session_id stays the stable client handle.

- acp_adapter/provenance.py: derive provenance from SessionDB
- server.py: attach _meta to new/load/resume responses; emit a
  session_info_update when the internal head rotates during a prompt
This commit is contained in:
Teknium 2026-06-07 22:22:21 -07:00 committed by GitHub
parent 240c5d4543
commit 777dc9da62
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 308 additions and 2 deletions

127
acp_adapter/provenance.py Normal file
View file

@ -0,0 +1,127 @@
"""Derive ACP session-provenance metadata from the existing compression chain.
This is an additive Hermes extension surfaced under ACP ``_meta.hermes`` so
existing ACP clients ignore it. It carries no new persisted state: everything
is derived on demand from the ``sessions`` table (``parent_session_id`` /
``end_reason``), which already models compression-continuation chains.
The ACP/editor ``session_id`` stays the stable public handle. When context
compression rotates the internal Hermes head, ``build_session_provenance`` lets
a client see the previous/current internal ids and the lineage root without
parsing status text, guessing from token drops, or reading ``state.db``.
"""
from __future__ import annotations
from typing import Any, Dict, Optional
# Bound defensive walks; compression chains this deep are pathological.
_MAX_WALK = 100
def build_session_provenance(
db: Any,
acp_session_id: str,
current_hermes_session_id: str,
*,
previous_hermes_session_id: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
"""Build ``_meta.hermes.sessionProvenance`` for an ACP session.
Args:
db: A ``SessionDB`` (must expose ``get_session``).
acp_session_id: The stable ACP/editor-facing session handle.
current_hermes_session_id: The live internal Hermes DB session id
(``state.agent.session_id``).
previous_hermes_session_id: The internal id from before the most recent
turn, when known. Supplied by ``prompt()`` to flag a rotation.
Returns:
A dict suitable for ``{"hermes": {"sessionProvenance": <dict>}}`` under
ACP ``_meta``, or ``None`` if the session can't be read.
"""
try:
row = db.get_session(current_hermes_session_id)
except Exception:
return None
if not row:
return None
parent_id = row.get("parent_session_id")
end_reason = row.get("end_reason")
# Walk parents to the lineage root and count compression depth. Only
# compression-split parents (parent.end_reason == 'compression') count
# toward depth — delegate/branch children share the parent_session_id
# column but are not compaction boundaries.
root_id = current_hermes_session_id
compression_depth = 0
cursor_parent = parent_id
seen = {current_hermes_session_id}
for _ in range(_MAX_WALK):
if not cursor_parent or cursor_parent in seen:
break
seen.add(cursor_parent)
try:
prow = db.get_session(cursor_parent)
except Exception:
prow = None
if not prow:
break
root_id = cursor_parent
if prow.get("end_reason") == "compression":
compression_depth += 1
cursor_parent = prow.get("parent_session_id")
# A session is a compression continuation when its parent was ended with
# end_reason='compression'. Determine that from the immediate parent.
is_continuation = False
if parent_id:
try:
immediate_parent = db.get_session(parent_id)
except Exception:
immediate_parent = None
if immediate_parent and immediate_parent.get("end_reason") == "compression":
is_continuation = True
rotated = bool(
previous_hermes_session_id
and previous_hermes_session_id != current_hermes_session_id
)
provenance: Dict[str, Any] = {
"acpSessionId": acp_session_id,
"currentHermesSessionId": current_hermes_session_id,
"rootHermesSessionId": root_id,
"parentHermesSessionId": parent_id,
"sessionKind": "continuation" if is_continuation else "root",
"compressionDepth": compression_depth,
}
if previous_hermes_session_id:
provenance["previousHermesSessionId"] = previous_hermes_session_id
if rotated:
# The head moved during the last turn. The only mechanism that rotates
# the internal id mid-turn is compression-driven session splitting.
provenance["reason"] = "compression"
provenance["creatorKind"] = "compression"
return provenance
def session_provenance_meta(
db: Any,
acp_session_id: str,
current_hermes_session_id: str,
*,
previous_hermes_session_id: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
"""Return a ready ``_meta`` payload: ``{"hermes": {"sessionProvenance": ...}}``."""
prov = build_session_provenance(
db,
acp_session_id,
current_hermes_session_id,
previous_hermes_session_id=previous_hermes_session_id,
)
if prov is None:
return None
return {"hermes": {"sessionProvenance": prov}}

View file

@ -71,6 +71,7 @@ from acp_adapter.events import (
make_tool_progress_cb,
)
from acp_adapter.permissions import make_approval_callback
from acp_adapter.provenance import session_provenance_meta
from acp_adapter.session import SessionManager, SessionState, _expand_acp_enabled_toolsets
from acp_adapter.tools import build_tool_complete, build_tool_start
@ -709,8 +710,39 @@ class HermesACPAgent(acp.Agent):
exc_info=True,
)
async def _send_session_info_update(self, session_id: str) -> None:
"""Send ACP native session metadata after Hermes changes it."""
def _provenance_meta(
self,
acp_session_id: str,
current_hermes_session_id: str,
previous_hermes_session_id: Optional[str] = None,
) -> Optional[dict]:
"""Best-effort ``_meta.hermes.sessionProvenance`` for an ACP session."""
try:
return session_provenance_meta(
self.session_manager._get_db(),
acp_session_id,
current_hermes_session_id,
previous_hermes_session_id=previous_hermes_session_id,
)
except Exception:
logger.debug(
"Could not build ACP session provenance for %s", acp_session_id, exc_info=True
)
return None
async def _send_session_info_update(
self,
session_id: str,
*,
current_hermes_session_id: Optional[str] = None,
previous_hermes_session_id: Optional[str] = None,
) -> None:
"""Send ACP native session metadata after Hermes changes it.
When the internal Hermes head rotated (e.g. compression-driven session
split during a turn), pass ``previous_hermes_session_id`` so the
attached ``_meta.hermes.sessionProvenance`` flags the rotation reason.
"""
if not self._conn:
return
try:
@ -727,10 +759,16 @@ class HermesACPAgent(acp.Agent):
# the updated_at since we're emitting this notification precisely
# because the title was just refreshed.
updated_at = datetime.now(timezone.utc).isoformat()
meta = self._provenance_meta(
session_id,
current_hermes_session_id or session_id,
previous_hermes_session_id,
)
update = SessionInfoUpdate(
session_update="session_info_update",
title=title if isinstance(title, str) and title.strip() else None,
updated_at=updated_at,
field_meta=meta,
)
try:
await self._conn.session_update(
@ -1081,6 +1119,9 @@ class HermesACPAgent(acp.Agent):
session_id=state.session_id,
models=self._build_model_state(state),
modes=self._session_modes(state),
field_meta=self._provenance_meta(
state.session_id, getattr(state.agent, "session_id", state.session_id)
),
)
async def load_session(
@ -1125,6 +1166,9 @@ class HermesACPAgent(acp.Agent):
return LoadSessionResponse(
models=self._build_model_state(state),
modes=self._session_modes(state),
field_meta=self._provenance_meta(
session_id, getattr(state.agent, "session_id", session_id)
),
)
async def resume_session(
@ -1157,6 +1201,9 @@ class HermesACPAgent(acp.Agent):
return ResumeSessionResponse(
models=self._build_model_state(state),
modes=self._session_modes(state),
field_meta=self._provenance_meta(
state.session_id, getattr(state.agent, "session_id", state.session_id)
),
)
async def cancel(self, session_id: str, **kwargs: Any) -> None:
@ -1494,6 +1541,11 @@ class HermesACPAgent(acp.Agent):
logger.debug("Could not clear ACP session context", exc_info=True)
try:
# Snapshot the internal Hermes DB session id before the turn so we
# can detect a compression-driven session rotation afterwards. The
# ACP `session_id` stays the stable client handle; agent.session_id
# is the live internal head that compression may rotate.
pre_turn_hermes_id = getattr(state.agent, "session_id", None)
# Wrap the executor call in a fresh copy of the current context so
# concurrent ACP sessions on the shared ThreadPoolExecutor don't
# stomp on each other's ContextVar writes (HERMES_SESSION_KEY in
@ -1512,6 +1564,30 @@ class HermesACPAgent(acp.Agent):
# Persist updated history so sessions survive process restarts.
self.session_manager.save_session(session_id)
# Detect a compression-driven internal session rotation. If the agent's
# DB head moved during the turn, emit a session_info_update carrying
# _meta.hermes.sessionProvenance so ACP clients can render the boundary
# and keep old/new ids in lineage. The ACP session_id is unchanged.
post_turn_hermes_id = getattr(state.agent, "session_id", None)
if (
conn
and post_turn_hermes_id
and pre_turn_hermes_id
and post_turn_hermes_id != pre_turn_hermes_id
):
try:
await self._send_session_info_update(
session_id,
current_hermes_session_id=post_turn_hermes_id,
previous_hermes_session_id=pre_turn_hermes_id,
)
except Exception:
logger.debug(
"Could not emit ACP provenance update after rotation for %s",
session_id,
exc_info=True,
)
final_response = result.get("final_response", "")
cancelled = bool(state.cancel_event and state.cancel_event.is_set())
interrupted = bool(result.get("interrupted")) or cancelled

View file

@ -0,0 +1,103 @@
"""Tests for ACP session-provenance derivation (issue #33617).
Exercises acp_adapter.provenance against a real SessionDB no mocks covering
the acceptance-criteria matrix: root session, compression-split continuation,
multi-depth chains, rotation flagging, and graceful handling of unknown ids.
"""
import time
import pytest
from acp_adapter.provenance import build_session_provenance, session_provenance_meta
from hermes_state import SessionDB
@pytest.fixture()
def db(tmp_path):
d = SessionDB(db_path=tmp_path / "state.db")
yield d
def _mk(db, sid, parent=None):
db.create_session(session_id=sid, source="acp", parent_session_id=parent)
def test_root_session_no_compression(db):
_mk(db, "root1")
prov = build_session_provenance(db, "acp-1", "root1")
assert prov["acpSessionId"] == "acp-1"
assert prov["currentHermesSessionId"] == "root1"
assert prov["rootHermesSessionId"] == "root1"
assert prov["parentHermesSessionId"] is None
assert prov["sessionKind"] == "root"
assert prov["compressionDepth"] == 0
assert "reason" not in prov # no rotation signalled
def test_compression_split_continuation(db):
# Parent ended with compression, child created afterwards.
_mk(db, "old")
db.end_session("old", "compression")
time.sleep(0.001)
_mk(db, "new", parent="old")
prov = build_session_provenance(
db, "acp-1", "new", previous_hermes_session_id="old"
)
assert prov["sessionKind"] == "continuation"
assert prov["parentHermesSessionId"] == "old"
assert prov["rootHermesSessionId"] == "old"
assert prov["compressionDepth"] == 1
assert prov["previousHermesSessionId"] == "old"
# Head rotated this turn → reason/creatorKind flagged.
assert prov["reason"] == "compression"
assert prov["creatorKind"] == "compression"
def test_multi_depth_chain(db):
_mk(db, "s0")
db.end_session("s0", "compression")
_mk(db, "s1", parent="s0")
db.end_session("s1", "compression")
_mk(db, "s2", parent="s1")
prov = build_session_provenance(db, "acp-1", "s2")
assert prov["rootHermesSessionId"] == "s0"
assert prov["compressionDepth"] == 2
assert prov["sessionKind"] == "continuation"
def test_non_compression_parent_is_root_not_continuation(db):
# A child with a parent that did NOT end via compression (e.g. delegate
# or branch child) must not be reported as a compression continuation.
_mk(db, "p")
_mk(db, "c", parent="p") # parent still live, no end_reason
prov = build_session_provenance(db, "acp-1", "c")
assert prov["sessionKind"] == "root"
assert prov["compressionDepth"] == 0
assert prov["rootHermesSessionId"] == "p" # lineage root still walked
def test_no_false_rotation_when_head_unchanged(db):
_mk(db, "s")
# previous == current → no rotation reason emitted.
prov = build_session_provenance(
db, "acp-1", "s", previous_hermes_session_id="s"
)
assert "reason" not in prov
assert "creatorKind" not in prov
assert prov["previousHermesSessionId"] == "s"
def test_unknown_session_returns_none(db):
assert build_session_provenance(db, "acp-1", "does-not-exist") is None
assert session_provenance_meta(db, "acp-1", "does-not-exist") is None
def test_meta_wrapper_shape(db):
_mk(db, "root1")
meta = session_provenance_meta(db, "acp-1", "root1")
assert set(meta.keys()) == {"hermes"}
assert "sessionProvenance" in meta["hermes"]
assert meta["hermes"]["sessionProvenance"]["currentHermesSessionId"] == "root1"