fix(security): wire session-id sanitizer into artifact paths + API boundary

Defense-in-depth on top of _safe_session_filename_component (#5958):

Sink (makes the bad write impossible regardless of entry point):
- run_agent._save_session_log: sanitize session_id before building the
  session_{sid}.json snapshot path.
- agent_runtime_helpers.dump_api_request_debug: sanitize before building
  the request_dump_{sid}_{ts}.json path.

Boundary (clean 400 instead of a silently-hashed filename):
- api_server rejects path-traversal-shaped X-Hermes-Session-Id on the
  session-continuation path and the explicit /api/sessions create path,
  reusing gateway.session._is_path_unsafe (mirrors the native gateway's
  entry-boundary guard). Also enforces the session-header length cap on
  the continuation path.

Tests: traversal session_id stays contained at the write site; sanitizer
always yields a traversal-free segment; the API header rejects
../, absolute, and Windows-traversal IDs with 400.
This commit is contained in:
teknium1 2026-06-29 04:08:45 -07:00 committed by Teknium
parent 1debd5e8f9
commit ea1372d2af
5 changed files with 71 additions and 6 deletions

View file

@ -1281,7 +1281,11 @@ def dump_api_request_debug(
dump_payload["error"] = error_info
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
dump_file = agent.logs_dir / f"request_dump_{agent.session_id}_{timestamp}.json"
# Sanitize the session ID into a traversal-free path segment — it can
# originate from untrusted input (X-Hermes-Session-Id header), and an
# unsanitized "../"-shaped ID would write the dump outside logs_dir.
safe_sid = _ra()._safe_session_filename_component(agent.session_id)
dump_file = agent.logs_dir / f"request_dump_{safe_sid}_{timestamp}.json"
# Redact secrets before persisting/printing. This dump captures the
# full request body (system prompt, tool defs, context-embedded

View file

@ -1490,7 +1490,8 @@ class APIServerAdapter(BasePlatformAdapter):
raw_id = body.get("id") or body.get("session_id")
session_id = str(raw_id).strip() if raw_id else f"api_{int(time.time())}_{uuid.uuid4().hex[:8]}"
if not session_id or re.search(r'[\r\n\x00]', session_id):
from gateway.session import _is_path_unsafe
if not session_id or re.search(r'[\r\n\x00]', session_id) or _is_path_unsafe(session_id):
return web.json_response(_openai_error("Invalid session ID", code="invalid_session_id"), status=400)
if len(session_id) > self._MAX_SESSION_HEADER_LEN:
return web.json_response(_openai_error("Session ID too long", code="invalid_session_id"), status=400)
@ -1905,12 +1906,22 @@ class APIServerAdapter(BasePlatformAdapter):
),
status=403,
)
# Sanitize: reject control characters that could enable header injection.
if re.search(r'[\r\n\x00]', provided_session_id):
# Sanitize: reject control characters that could enable header
# injection, and path-traversal-shaped IDs that would escape the
# sessions directory when interpolated into on-disk artifact
# filenames (session snapshots, request dumps). Mirrors the native
# gateway's entry-boundary guard (gateway.session._is_path_unsafe).
from gateway.session import _is_path_unsafe
if re.search(r'[\r\n\x00]', provided_session_id) or _is_path_unsafe(provided_session_id):
return web.json_response(
{"error": {"message": "Invalid session ID", "type": "invalid_request_error"}},
status=400,
)
if len(provided_session_id) > self._MAX_SESSION_HEADER_LEN:
return web.json_response(
{"error": {"message": "Session ID too long", "type": "invalid_request_error"}},
status=400,
)
session_id = provided_session_id
try:
db = self._ensure_session_db()

View file

@ -2387,9 +2387,13 @@ class AIAgent:
# Re-derive the target path each call so /branch and /compress
# session-id changes land in the right file without any re-point
# bookkeeping at the call sites.
# bookkeeping at the call sites. Sanitize the session ID into a
# single traversal-free path segment — session IDs can come from
# untrusted input (X-Hermes-Session-Id header) and must not escape
# the sessions directory.
try:
log_file = self.logs_dir / f"session_{self.session_id}.json"
safe_sid = _safe_session_filename_component(self.session_id)
log_file = self.logs_dir / f"session_{safe_sid}.json"
except Exception:
return

View file

@ -3454,6 +3454,24 @@ class TestSessionIdHeader:
call_kwargs = mock_run.call_args.kwargs
assert call_kwargs["session_id"] == "my-session-123"
@pytest.mark.asyncio
async def test_traversal_session_id_header_rejected(self, auth_adapter):
"""Security (#5958): a path-traversal X-Hermes-Session-Id must be
rejected with 400 so it can't reach the filesystem artifact paths
(session snapshot / request dump) and escape the sessions dir."""
app = _create_app(auth_adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(auth_adapter, "_run_agent", new_callable=AsyncMock) as mock_run:
for bad in ("../../../../etc/pwned", "/abs/path", "..\\win"):
resp = await cli.post(
"/v1/chat/completions",
headers={"X-Hermes-Session-Id": bad, "Authorization": "Bearer sk-secret"},
json={"model": "hermes-agent", "messages": [{"role": "user", "content": "hi"}]},
)
assert resp.status == 400, f"{bad!r} should be rejected"
# The agent is never invoked for a rejected ID.
assert mock_run.call_count == 0
@pytest.mark.asyncio
async def test_provided_session_id_loads_history_from_db(self, auth_adapter):
"""When X-Hermes-Session-Id is provided, history comes from SessionDB not request body."""

View file

@ -629,6 +629,34 @@ class TestSessionJsonSnapshotOptIn:
# the session JSON opt-in.
assert hasattr(agent, "logs_dir")
def test_traversal_session_id_cannot_escape_logs_dir(self, agent, tmp_path):
# Security regression (#5958): a traversal-shaped session ID (which can
# originate from the untrusted X-Hermes-Session-Id API header) must not
# redirect the session snapshot outside the sessions directory.
agent._session_json_enabled = True
agent.logs_dir = tmp_path
agent.session_id = "../../../../outside_dir/pwned"
agent._save_session_log([{"role": "user", "content": "hello"}])
# Exactly one snapshot, and it lives directly under logs_dir.
written = list(tmp_path.glob("session_*.json"))
assert len(written) == 1, "writer must produce a single contained snapshot"
assert written[0].resolve().parent == tmp_path.resolve()
# Nothing escaped to the traversal target.
assert not (tmp_path.parent.parent / "outside_dir").exists()
def test_safe_session_filename_component_contains_traversal(self):
# The sanitizer is the chokepoint: every session-ID-derived artifact
# path goes through it, so it must always yield a single, traversal-free
# path segment while leaving legitimate IDs untouched.
f = run_agent._safe_session_filename_component
for raw in ("../../etc/passwd", "/abs/path", "..\\win\\trav", "a/b/c"):
out = f(raw)
assert "/" not in out and "\\" not in out and ".." not in out, out
# Legit IDs pass through unchanged; distinct IDs never collide.
assert f("api-abc123def456") == "api-abc123def456"
assert f("../a") != f("../b")
class TestSaveSessionLogRedactsSecrets:
"""Regression: session_*.json must not contain plaintext credentials (#19798, #19845)."""