feat(api-server): X-Hermes-Session-Key header for long-term memory scoping (#20199)

* feat(api-server): X-Hermes-Session-Key header for long-term memory scoping

API Server integrations (Open WebUI, custom web UIs) can now pass a stable
per-channel identifier via X-Hermes-Session-Key that scopes long-term memory
(Honcho, etc.) independently of the transcript-scoped X-Hermes-Session-Id.
This matches the native gateway's session_key / session_id split: one stable
key per assistant channel, many independent transcripts that rotate on /new.

- _create_agent and _run_agent accept gateway_session_key and pass it to
  AIAgent(gateway_session_key=...), which is already honored by the Honcho
  memory provider (plugins/memory/honcho/client.py resolve_session_name).
- New shared helper _parse_session_key_header applies the same API-key
  gate, control-character sanitization, and a 256-char length cap as the
  existing session-id header.
- All three agent endpoints honor the header: /v1/chat/completions,
  /v1/responses, /v1/runs. JSON and SSE responses echo it back.
- /v1/capabilities advertises session_key_header so clients can
  feature-detect.

Closes #20060.

Co-authored-by: Andy Stewart <lazycat.manatee@gmail.com>

* chore: AUTHOR_MAP entry for manateelazycat

---------

Co-authored-by: Andy Stewart <lazycat.manatee@gmail.com>
This commit is contained in:
Teknium 2026-05-05 05:34:47 -07:00 committed by GitHub
parent 436672de0e
commit fe8560fc12
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 310 additions and 5 deletions

View file

@ -2,8 +2,8 @@
OpenAI-compatible API server platform adapter.
Exposes an HTTP server with endpoints:
- POST /v1/chat/completions OpenAI Chat Completions format (stateless; opt-in session continuity via X-Hermes-Session-Id header)
- POST /v1/responses OpenAI Responses API format (stateful via previous_response_id)
- POST /v1/chat/completions OpenAI Chat Completions format (stateless; opt-in session continuity via X-Hermes-Session-Id header; opt-in long-term memory scoping via X-Hermes-Session-Key header)
- POST /v1/responses OpenAI Responses API format (stateful via previous_response_id; X-Hermes-Session-Key supported)
- GET /v1/responses/{response_id} Retrieve a stored response
- DELETE /v1/responses/{response_id} Delete a stored response
- GET /v1/models lists hermes-agent as an available model
@ -698,6 +698,71 @@ class APIServerAdapter(BasePlatformAdapter):
status=401,
)
# ------------------------------------------------------------------
# Session header helpers
# ------------------------------------------------------------------
# Soft length cap for session identifiers. Headers are bounded in
# aggregate by aiohttp (``client_max_size`` / default 8 KiB per
# header), but we impose a tighter limit on the session headers so a
# caller can't burn memory by passing a multi-kilobyte "session key".
# 256 chars is well above any realistic stable channel identifier
# (e.g. ``agent:main:webui:dm:user-42``) while staying small enough
# that the sanitized form is safe to pass into Honcho / state.db.
_MAX_SESSION_HEADER_LEN = 256
def _parse_session_key_header(
self, request: "web.Request"
) -> tuple[Optional[str], Optional["web.Response"]]:
"""Extract and validate the ``X-Hermes-Session-Key`` header.
The session key is a stable per-channel identifier that scopes
long-term memory (e.g. Honcho sessions) across transcripts. It
is independent of ``X-Hermes-Session-Id``: callers may send
either, both, or neither.
Returns ``(session_key, None)`` on success (with an empty/absent
header yielding ``None`` for the key), or ``(None, error_response)``
on validation failure.
Security: like session continuation, accepting a caller-supplied
memory scope requires API-key authentication so that an
unauthenticated client on a local-only server can't inject itself
into another user's long-term memory scope by guessing a key.
"""
raw = request.headers.get("X-Hermes-Session-Key", "").strip()
if not raw:
return None, None
if not self._api_key:
logger.warning(
"X-Hermes-Session-Key rejected: no API key configured. "
"Set API_SERVER_KEY to enable long-term memory scoping."
)
return None, web.json_response(
_openai_error(
"X-Hermes-Session-Key requires API key authentication. "
"Configure API_SERVER_KEY to enable this feature."
),
status=403,
)
# Reject control characters that could enable header injection on
# the echo path.
if re.search(r'[\r\n\x00]', raw):
return None, web.json_response(
{"error": {"message": "Invalid session key", "type": "invalid_request_error"}},
status=400,
)
if len(raw) > self._MAX_SESSION_HEADER_LEN:
return None, web.json_response(
{"error": {"message": "Session key too long", "type": "invalid_request_error"}},
status=400,
)
return raw, None
# ------------------------------------------------------------------
# Session DB helper
# ------------------------------------------------------------------
@ -728,6 +793,7 @@ class APIServerAdapter(BasePlatformAdapter):
tool_progress_callback=None,
tool_start_callback=None,
tool_complete_callback=None,
gateway_session_key: Optional[str] = None,
) -> Any:
"""
Create an AIAgent instance using the gateway's runtime config.
@ -736,6 +802,13 @@ class APIServerAdapter(BasePlatformAdapter):
base_url, etc. from config.yaml / env vars. Toolsets are resolved
from config.yaml platform_toolsets.api_server (same as all other
gateway platforms), falling back to the hermes-api-server default.
``gateway_session_key`` is a stable per-channel identifier supplied
by the client (via ``X-Hermes-Session-Key``). Unlike ``session_id``
which scopes the short-term transcript and rotates on /new, this
key is meant to persist across transcripts so long-term memory
providers (e.g. Honcho) can scope their per-chat state correctly
matching the semantics of the native gateway's ``session_key``.
"""
from run_agent import AIAgent
from gateway.run import _resolve_runtime_agent_kwargs, _resolve_gateway_model, _load_gateway_config, GatewayRunner
@ -771,6 +844,7 @@ class APIServerAdapter(BasePlatformAdapter):
session_db=self._ensure_session_db(),
fallback_model=fallback_model,
reasoning_config=reasoning_config,
gateway_session_key=gateway_session_key,
)
return agent
@ -854,6 +928,7 @@ class APIServerAdapter(BasePlatformAdapter):
"run_stop": True,
"tool_progress_events": True,
"session_continuity_header": "X-Hermes-Session-Id",
"session_key_header": "X-Hermes-Session-Key",
"cors": bool(self._cors_origins),
},
"endpoints": {
@ -925,6 +1000,15 @@ class APIServerAdapter(BasePlatformAdapter):
status=400,
)
# Allow caller to scope long-term memory (e.g. Honcho) with a
# stable per-channel identifier via X-Hermes-Session-Key. This
# is independent of X-Hermes-Session-Id: the key persists across
# transcripts while the id rotates when the caller starts a new
# transcript (i.e. /new semantics). See _parse_session_key_header.
gateway_session_key, key_err = self._parse_session_key_header(request)
if key_err is not None:
return key_err
# Allow caller to continue an existing session by passing X-Hermes-Session-Id.
# When provided, history is loaded from state.db instead of from the request body.
#
@ -1059,11 +1143,13 @@ class APIServerAdapter(BasePlatformAdapter):
tool_start_callback=_on_tool_start,
tool_complete_callback=_on_tool_complete,
agent_ref=agent_ref,
gateway_session_key=gateway_session_key,
))
return await self._write_sse_chat_completion(
request, completion_id, model_name, created, _stream_q,
agent_task, agent_ref, session_id=session_id,
gateway_session_key=gateway_session_key,
)
# Non-streaming: run the agent (with optional Idempotency-Key)
@ -1073,6 +1159,7 @@ class APIServerAdapter(BasePlatformAdapter):
conversation_history=history,
ephemeral_system_prompt=system_prompt,
session_id=session_id,
gateway_session_key=gateway_session_key,
)
idempotency_key = request.headers.get("Idempotency-Key")
@ -1122,11 +1209,15 @@ class APIServerAdapter(BasePlatformAdapter):
},
}
return web.json_response(response_data, headers={"X-Hermes-Session-Id": session_id})
response_headers = {"X-Hermes-Session-Id": session_id}
if gateway_session_key:
response_headers["X-Hermes-Session-Key"] = gateway_session_key
return web.json_response(response_data, headers=response_headers)
async def _write_sse_chat_completion(
self, request: "web.Request", completion_id: str, model: str,
created: int, stream_q, agent_task, agent_ref=None, session_id: str = None,
gateway_session_key: str = None,
) -> "web.StreamResponse":
"""Write real streaming SSE from agent's stream_delta_callback queue.
@ -1149,6 +1240,8 @@ class APIServerAdapter(BasePlatformAdapter):
sse_headers.update(cors)
if session_id:
sse_headers["X-Hermes-Session-Id"] = session_id
if gateway_session_key:
sse_headers["X-Hermes-Session-Key"] = gateway_session_key
response = web.StreamResponse(status=200, headers=sse_headers)
await response.prepare(request)
@ -1272,6 +1365,7 @@ class APIServerAdapter(BasePlatformAdapter):
conversation: Optional[str],
store: bool,
session_id: str,
gateway_session_key: Optional[str] = None,
) -> "web.StreamResponse":
"""Write an SSE stream for POST /v1/responses (OpenAI Responses API).
@ -1314,6 +1408,8 @@ class APIServerAdapter(BasePlatformAdapter):
sse_headers.update(cors)
if session_id:
sse_headers["X-Hermes-Session-Id"] = session_id
if gateway_session_key:
sse_headers["X-Hermes-Session-Key"] = gateway_session_key
response = web.StreamResponse(status=200, headers=sse_headers)
await response.prepare(request)
@ -1763,6 +1859,11 @@ class APIServerAdapter(BasePlatformAdapter):
if auth_err:
return auth_err
# Long-term memory scope header (see chat_completions for details).
gateway_session_key, key_err = self._parse_session_key_header(request)
if key_err is not None:
return key_err
# Parse request body
try:
body = await request.json()
@ -1914,6 +2015,7 @@ class APIServerAdapter(BasePlatformAdapter):
tool_start_callback=_on_tool_start,
tool_complete_callback=_on_tool_complete,
agent_ref=agent_ref,
gateway_session_key=gateway_session_key,
))
response_id = f"resp_{uuid.uuid4().hex[:28]}"
@ -1934,6 +2036,7 @@ class APIServerAdapter(BasePlatformAdapter):
conversation=conversation,
store=store,
session_id=session_id,
gateway_session_key=gateway_session_key,
)
async def _compute_response():
@ -1942,6 +2045,7 @@ class APIServerAdapter(BasePlatformAdapter):
conversation_history=conversation_history,
ephemeral_system_prompt=instructions,
session_id=session_id,
gateway_session_key=gateway_session_key,
)
idempotency_key = request.headers.get("Idempotency-Key")
@ -2016,7 +2120,10 @@ class APIServerAdapter(BasePlatformAdapter):
if conversation:
self._response_store.set_conversation(conversation, response_id)
return web.json_response(response_data)
response_headers = {"X-Hermes-Session-Id": session_id}
if gateway_session_key:
response_headers["X-Hermes-Session-Key"] = gateway_session_key
return web.json_response(response_data, headers=response_headers)
# ------------------------------------------------------------------
# GET / DELETE response endpoints
@ -2338,6 +2445,7 @@ class APIServerAdapter(BasePlatformAdapter):
tool_start_callback=None,
tool_complete_callback=None,
agent_ref: Optional[list] = None,
gateway_session_key: Optional[str] = None,
) -> tuple:
"""
Create an agent and run a conversation in a thread executor.
@ -2360,6 +2468,7 @@ class APIServerAdapter(BasePlatformAdapter):
tool_progress_callback=tool_progress_callback,
tool_start_callback=tool_start_callback,
tool_complete_callback=tool_complete_callback,
gateway_session_key=gateway_session_key,
)
if agent_ref is not None:
agent_ref[0] = agent
@ -2453,6 +2562,11 @@ class APIServerAdapter(BasePlatformAdapter):
if auth_err:
return auth_err
# Long-term memory scope header (see chat_completions for details).
gateway_session_key, key_err = self._parse_session_key_header(request)
if key_err is not None:
return key_err
# Enforce concurrency limit
if len(self._run_streams) >= self._MAX_CONCURRENT_RUNS:
return web.json_response(
@ -2561,6 +2675,7 @@ class APIServerAdapter(BasePlatformAdapter):
session_id=session_id,
stream_delta_callback=_text_cb,
tool_progress_callback=event_cb,
gateway_session_key=gateway_session_key,
)
self._active_run_agents[run_id] = agent
def _run_sync():
@ -2661,7 +2776,14 @@ class APIServerAdapter(BasePlatformAdapter):
if hasattr(task, "add_done_callback"):
task.add_done_callback(self._background_tasks.discard)
return web.json_response({"run_id": run_id, "status": "started"}, status=202)
response_headers = (
{"X-Hermes-Session-Key": gateway_session_key} if gateway_session_key else {}
)
return web.json_response(
{"run_id": run_id, "status": "started"},
status=202,
headers=response_headers,
)
async def _handle_get_run(self, request: "web.Request") -> "web.Response":
"""GET /v1/runs/{run_id} — return pollable run status for external UIs."""

View file

@ -63,6 +63,7 @@ AUTHOR_MAP = {
"chengoak@users.noreply.github.com": "chengoak",
"mrhanoi@outlook.com": "qxxaa",
"emelyanenko.kirill@gmail.com": "EmelyanenkoK",
"lazycat.manatee@gmail.com": "manateelazycat",
# Matrix parity salvage batch (April 2026)
"sr@samirusani": "samrusani",
"angelclaw@AngelMacBook.local": "angel12",

View file

@ -2563,3 +2563,185 @@ class TestSessionIdHeader:
call_kwargs = mock_run.call_args.kwargs
assert call_kwargs["conversation_history"] == []
assert call_kwargs["session_id"] == "some-session"
# ---------------------------------------------------------------------------
# X-Hermes-Session-Key header (long-term memory scoping)
# ---------------------------------------------------------------------------
class TestSessionKeyHeader:
"""The session key is a stable per-channel identifier that scopes
long-term memory (e.g. Honcho) independently of the transcript-scoped
session_id. A third-party Web UI passes one stable key per assistant
channel and rotates session_id on /new, matching the native
gateway's session_key / session_id split.
"""
@pytest.mark.asyncio
async def test_session_key_passed_to_agent_and_echoed(self, auth_adapter):
"""X-Hermes-Session-Key reaches _run_agent as gateway_session_key and is echoed back."""
mock_result = {"final_response": "ok", "messages": [], "api_calls": 1}
app = _create_app(auth_adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(auth_adapter, "_run_agent", new_callable=AsyncMock) as mock_run:
mock_run.return_value = (mock_result, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0})
resp = await cli.post(
"/v1/chat/completions",
headers={
"X-Hermes-Session-Key": "webui:user-42",
"Authorization": "Bearer sk-secret",
},
json={"model": "hermes-agent", "messages": [{"role": "user", "content": "hi"}]},
)
assert resp.status == 200
assert resp.headers.get("X-Hermes-Session-Key") == "webui:user-42"
call_kwargs = mock_run.call_args.kwargs
assert call_kwargs["gateway_session_key"] == "webui:user-42"
@pytest.mark.asyncio
async def test_session_key_independent_of_session_id(self, auth_adapter):
"""Both headers coexist: key scopes memory, id scopes transcript."""
mock_result = {"final_response": "ok", "messages": [], "api_calls": 1}
mock_db = MagicMock()
mock_db.get_messages_as_conversation.return_value = []
auth_adapter._session_db = mock_db
app = _create_app(auth_adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(auth_adapter, "_run_agent", new_callable=AsyncMock) as mock_run:
mock_run.return_value = (mock_result, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0})
resp = await cli.post(
"/v1/chat/completions",
headers={
"X-Hermes-Session-Key": "channel-abc",
"X-Hermes-Session-Id": "transcript-xyz",
"Authorization": "Bearer sk-secret",
},
json={"model": "hermes-agent", "messages": [{"role": "user", "content": "hi"}]},
)
assert resp.status == 200
assert resp.headers.get("X-Hermes-Session-Key") == "channel-abc"
assert resp.headers.get("X-Hermes-Session-Id") == "transcript-xyz"
call_kwargs = mock_run.call_args.kwargs
assert call_kwargs["gateway_session_key"] == "channel-abc"
assert call_kwargs["session_id"] == "transcript-xyz"
@pytest.mark.asyncio
async def test_session_key_absent_yields_none(self, auth_adapter):
"""Omitting the header passes gateway_session_key=None and doesn't echo."""
mock_result = {"final_response": "ok", "messages": [], "api_calls": 1}
app = _create_app(auth_adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(auth_adapter, "_run_agent", new_callable=AsyncMock) as mock_run:
mock_run.return_value = (mock_result, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0})
resp = await cli.post(
"/v1/chat/completions",
headers={"Authorization": "Bearer sk-secret"},
json={"model": "hermes-agent", "messages": [{"role": "user", "content": "hi"}]},
)
assert resp.status == 200
assert "X-Hermes-Session-Key" not in resp.headers
call_kwargs = mock_run.call_args.kwargs
assert call_kwargs["gateway_session_key"] is None
@pytest.mark.asyncio
async def test_session_key_rejected_without_api_key(self, adapter):
"""Without API_SERVER_KEY, accepting a caller-supplied memory scope is unsafe — reject with 403."""
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.post(
"/v1/chat/completions",
headers={"X-Hermes-Session-Key": "whatever"},
json={"model": "hermes-agent", "messages": [{"role": "user", "content": "hi"}]},
)
assert resp.status == 403
@pytest.mark.asyncio
async def test_session_key_rejects_control_chars(self, auth_adapter):
"""Header injection via \\r\\n must be rejected by the server-side validator.
Note: aiohttp client refuses to SEND a header containing CR/LF
(that check fires before the request leaves the client), so we
can't reach this code path through TestClient. Test the helper
directly instead with a raw request that bypasses client-side
validation.
"""
mock_request = MagicMock()
mock_request.headers = {"X-Hermes-Session-Key": "bad\rvalue"}
key, err = auth_adapter._parse_session_key_header(mock_request)
assert key is None
assert err is not None
assert err.status == 400
@pytest.mark.asyncio
async def test_session_key_rejects_oversized(self, auth_adapter):
"""Session keys longer than the cap are rejected."""
app = _create_app(auth_adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.post(
"/v1/chat/completions",
headers={"X-Hermes-Session-Key": "x" * 1000, "Authorization": "Bearer sk-secret"},
json={"model": "hermes-agent", "messages": [{"role": "user", "content": "hi"}]},
)
assert resp.status == 400
@pytest.mark.asyncio
async def test_session_key_threads_into_create_agent(self, auth_adapter):
"""End-to-end: verify AIAgent(gateway_session_key=...) receives the key via _create_agent."""
captured_kwargs = {}
def _fake_create_agent(**kwargs):
captured_kwargs.update(kwargs)
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "ok", "messages": []}
mock_agent.session_prompt_tokens = 0
mock_agent.session_completion_tokens = 0
mock_agent.session_total_tokens = 0
return mock_agent
app = _create_app(auth_adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(auth_adapter, "_create_agent", side_effect=_fake_create_agent):
resp = await cli.post(
"/v1/chat/completions",
headers={
"X-Hermes-Session-Key": "agent:main:webui:dm:user-7",
"Authorization": "Bearer sk-secret",
},
json={"model": "hermes-agent", "messages": [{"role": "user", "content": "hi"}]},
)
assert resp.status == 200
# _create_agent must be called with gateway_session_key threaded through
assert captured_kwargs.get("gateway_session_key") == "agent:main:webui:dm:user-7"
@pytest.mark.asyncio
async def test_responses_endpoint_accepts_session_key(self, auth_adapter):
"""Responses API honors the same X-Hermes-Session-Key contract."""
mock_result = {"final_response": "ok", "messages": [], "api_calls": 1}
app = _create_app(auth_adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(auth_adapter, "_run_agent", new_callable=AsyncMock) as mock_run:
mock_run.return_value = (mock_result, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0})
resp = await cli.post(
"/v1/responses",
headers={
"X-Hermes-Session-Key": "webui:chan-1",
"Authorization": "Bearer sk-secret",
},
json={"model": "hermes-agent", "input": "hello", "store": False},
)
assert resp.status == 200
assert resp.headers.get("X-Hermes-Session-Key") == "webui:chan-1"
call_kwargs = mock_run.call_args.kwargs
assert call_kwargs["gateway_session_key"] == "webui:chan-1"
@pytest.mark.asyncio
async def test_capabilities_advertises_session_key_header(self, adapter):
"""GET /v1/capabilities should advertise the new header so clients can feature-detect."""
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.get("/v1/capabilities")
assert resp.status == 200
data = await resp.json()
assert data["features"]["session_key_header"] == "X-Hermes-Session-Key"