fix(V-009): reject path traversal in SessionEntry.from_dict and harden _ensure_loaded

Addresses PR #9560 review comments: applies the CWE-22 fix to current main
(post-PR #458 rebase) and adds the requested regression tests.

- SessionEntry.from_dict now raises ValueError for session_key or session_id
  containing '..' or starting with '/' or '\' (directory traversal guard)
- SessionStore._ensure_loaded moves per-entry validation inside the loop so
  one malicious/corrupt entry is skipped with a warning instead of aborting
  the entire sessions.json load
- Adds TestSessionEntryFromDictTraversalValidation (5 cases) and
  TestEnsureLoadedSkipsInvalidEntries covering the skip-not-abort behavior

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OrbisAI Security 2026-06-05 10:28:06 +05:30 committed by Teknium
parent c8eb7cf843
commit 3a6a43cb81
2 changed files with 78 additions and 7 deletions

View file

@ -576,7 +576,7 @@ class SessionEntry:
session_key = data["session_key"]
session_id = data["session_id"]
# Validate path-sensitive fields to prevent directory traversal attacks
# Validate path-sensitive fields to prevent directory traversal (CWE-22)
for _field, _val in (("session_key", session_key), ("session_id", session_id)):
if _val and (".." in str(_val) or str(_val).startswith(("/", "\\"))):
raise ValueError(
@ -786,12 +786,11 @@ class SessionStore:
try:
with open(sessions_file, "r", encoding="utf-8") as f:
data = json.load(f)
for key, entry_data in data.items():
try:
self._entries[key] = SessionEntry.from_dict(entry_data)
except (ValueError, KeyError):
# Skip entries with unknown/removed platform values
continue
for key, entry_data in data.items():
try:
self._entries[key] = SessionEntry.from_dict(entry_data)
except (ValueError, KeyError) as e:
print(f"[gateway] Warning: Skipping invalid session entry {key!r}: {e}")
except Exception as e:
print(f"[gateway] Warning: Failed to load sessions: {e}")

View file

@ -1046,6 +1046,78 @@ class TestWhatsAppIdentifierPublicHelpers:
assert canonical_whatsapp_identifier("") == ""
class TestSessionEntryFromDictTraversalValidation:
"""Regression: from_dict must reject traversal sequences in session_key/session_id."""
BASE = {
"session_key": "agent:main:local:dm",
"session_id": "abc123",
"created_at": "2026-01-01T00:00:00",
"updated_at": "2026-01-01T00:00:00",
}
def _entry(self, **overrides):
from gateway.session import SessionEntry
return {**self.BASE, **overrides}
def test_valid_entry_loads(self):
from gateway.session import SessionEntry
entry = SessionEntry.from_dict(self._entry())
assert entry.session_id == "abc123"
def test_session_id_dotdot_raises(self):
from gateway.session import SessionEntry
with pytest.raises(ValueError, match="session_id"):
SessionEntry.from_dict(self._entry(session_id="../../etc/passwd"))
def test_session_key_dotdot_raises(self):
from gateway.session import SessionEntry
with pytest.raises(ValueError, match="session_key"):
SessionEntry.from_dict(self._entry(session_key="agent:main:../../secret"))
def test_session_id_absolute_unix_raises(self):
from gateway.session import SessionEntry
with pytest.raises(ValueError, match="session_id"):
SessionEntry.from_dict(self._entry(session_id="/etc/passwd"))
def test_session_id_absolute_windows_raises(self):
from gateway.session import SessionEntry
with pytest.raises(ValueError, match="session_id"):
SessionEntry.from_dict(self._entry(session_id="\\windows\\system32\\config"))
class TestEnsureLoadedSkipsInvalidEntries:
"""Regression: one bad sessions.json entry must not block valid entries from loading."""
def test_invalid_entry_skipped_valid_entry_loads(self, tmp_path):
import json
from gateway.session import SessionStore
from gateway.config import GatewayConfig
sessions_file = tmp_path / "sessions.json"
sessions_file.write_text(json.dumps({
"bad:key": {
"session_key": "bad:key",
"session_id": "../../evil",
"created_at": "2026-01-01T00:00:00",
"updated_at": "2026-01-01T00:00:00",
},
"agent:main:local:dm": {
"session_key": "agent:main:local:dm",
"session_id": "good123",
"created_at": "2026-01-01T00:00:00",
"updated_at": "2026-01-01T00:00:00",
},
}), encoding="utf-8")
store = SessionStore(sessions_dir=tmp_path, config=GatewayConfig())
store._ensure_loaded()
assert "bad:key" not in store._entries
assert "agent:main:local:dm" in store._entries
assert store._entries["agent:main:local:dm"].session_id == "good123"
class TestSessionStoreEntriesAttribute:
"""Regression: /reset must access _entries, not _sessions."""