From 3a6a43cb818ab597ac3edf73a59a0b00a47a9a3a Mon Sep 17 00:00:00 2001 From: OrbisAI Security Date: Fri, 5 Jun 2026 10:28:06 +0530 Subject: [PATCH] fix(V-009): reject path traversal in SessionEntry.from_dict and harden _ensure_loaded Addresses PR #9560 review comments: applies the CWE-22 fix to current main (post-PR #458 rebase) and adds the requested regression tests. - SessionEntry.from_dict now raises ValueError for session_key or session_id containing '..' or starting with '/' or '\' (directory traversal guard) - SessionStore._ensure_loaded moves per-entry validation inside the loop so one malicious/corrupt entry is skipped with a warning instead of aborting the entire sessions.json load - Adds TestSessionEntryFromDictTraversalValidation (5 cases) and TestEnsureLoadedSkipsInvalidEntries covering the skip-not-abort behavior Co-Authored-By: Claude Sonnet 4.6 --- gateway/session.py | 13 +++---- tests/gateway/test_session.py | 72 +++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 7 deletions(-) diff --git a/gateway/session.py b/gateway/session.py index f8984829a2c..e7f4f47d35e 100644 --- a/gateway/session.py +++ b/gateway/session.py @@ -576,7 +576,7 @@ class SessionEntry: session_key = data["session_key"] session_id = data["session_id"] - # Validate path-sensitive fields to prevent directory traversal attacks + # Validate path-sensitive fields to prevent directory traversal (CWE-22) for _field, _val in (("session_key", session_key), ("session_id", session_id)): if _val and (".." in str(_val) or str(_val).startswith(("/", "\\"))): raise ValueError( @@ -786,12 +786,11 @@ class SessionStore: try: with open(sessions_file, "r", encoding="utf-8") as f: data = json.load(f) - for key, entry_data in data.items(): - try: - self._entries[key] = SessionEntry.from_dict(entry_data) - except (ValueError, KeyError): - # Skip entries with unknown/removed platform values - continue + for key, entry_data in data.items(): + try: + self._entries[key] = SessionEntry.from_dict(entry_data) + except (ValueError, KeyError) as e: + print(f"[gateway] Warning: Skipping invalid session entry {key!r}: {e}") except Exception as e: print(f"[gateway] Warning: Failed to load sessions: {e}") diff --git a/tests/gateway/test_session.py b/tests/gateway/test_session.py index 239dc28c8fc..d42a3be4e70 100644 --- a/tests/gateway/test_session.py +++ b/tests/gateway/test_session.py @@ -1046,6 +1046,78 @@ class TestWhatsAppIdentifierPublicHelpers: assert canonical_whatsapp_identifier("") == "" +class TestSessionEntryFromDictTraversalValidation: + """Regression: from_dict must reject traversal sequences in session_key/session_id.""" + + BASE = { + "session_key": "agent:main:local:dm", + "session_id": "abc123", + "created_at": "2026-01-01T00:00:00", + "updated_at": "2026-01-01T00:00:00", + } + + def _entry(self, **overrides): + from gateway.session import SessionEntry + return {**self.BASE, **overrides} + + def test_valid_entry_loads(self): + from gateway.session import SessionEntry + entry = SessionEntry.from_dict(self._entry()) + assert entry.session_id == "abc123" + + def test_session_id_dotdot_raises(self): + from gateway.session import SessionEntry + with pytest.raises(ValueError, match="session_id"): + SessionEntry.from_dict(self._entry(session_id="../../etc/passwd")) + + def test_session_key_dotdot_raises(self): + from gateway.session import SessionEntry + with pytest.raises(ValueError, match="session_key"): + SessionEntry.from_dict(self._entry(session_key="agent:main:../../secret")) + + def test_session_id_absolute_unix_raises(self): + from gateway.session import SessionEntry + with pytest.raises(ValueError, match="session_id"): + SessionEntry.from_dict(self._entry(session_id="/etc/passwd")) + + def test_session_id_absolute_windows_raises(self): + from gateway.session import SessionEntry + with pytest.raises(ValueError, match="session_id"): + SessionEntry.from_dict(self._entry(session_id="\\windows\\system32\\config")) + + +class TestEnsureLoadedSkipsInvalidEntries: + """Regression: one bad sessions.json entry must not block valid entries from loading.""" + + def test_invalid_entry_skipped_valid_entry_loads(self, tmp_path): + import json + from gateway.session import SessionStore + from gateway.config import GatewayConfig + + sessions_file = tmp_path / "sessions.json" + sessions_file.write_text(json.dumps({ + "bad:key": { + "session_key": "bad:key", + "session_id": "../../evil", + "created_at": "2026-01-01T00:00:00", + "updated_at": "2026-01-01T00:00:00", + }, + "agent:main:local:dm": { + "session_key": "agent:main:local:dm", + "session_id": "good123", + "created_at": "2026-01-01T00:00:00", + "updated_at": "2026-01-01T00:00:00", + }, + }), encoding="utf-8") + + store = SessionStore(sessions_dir=tmp_path, config=GatewayConfig()) + store._ensure_loaded() + + assert "bad:key" not in store._entries + assert "agent:main:local:dm" in store._entries + assert store._entries["agent:main:local:dm"].session_id == "good123" + + class TestSessionStoreEntriesAttribute: """Regression: /reset must access _entries, not _sessions."""