From c052cf0eea054920619c3690123310abb6443a86 Mon Sep 17 00:00:00 2001 From: Dusk1e Date: Sun, 12 Apr 2026 14:13:13 +0300 Subject: [PATCH] fix(security): validate domain/service params in ha_call_service to prevent path traversal --- tests/tools/test_homeassistant_tool.py | 88 ++++++++++++++++++++++++++ tools/homeassistant_tool.py | 17 +++++ 2 files changed, 105 insertions(+) diff --git a/tests/tools/test_homeassistant_tool.py b/tests/tools/test_homeassistant_tool.py index b136b5653..e18dcb385 100644 --- a/tests/tools/test_homeassistant_tool.py +++ b/tests/tools/test_homeassistant_tool.py @@ -18,6 +18,7 @@ from tools.homeassistant_tool import ( _handle_call_service, _BLOCKED_DOMAINS, _ENTITY_ID_RE, + _SERVICE_NAME_RE, ) @@ -303,6 +304,93 @@ class TestEntityIdValidation: assert "Invalid entity_id" not in result["error"] +# --------------------------------------------------------------------------- +# Security: domain/service name format validation +# --------------------------------------------------------------------------- + + +class TestServiceNameValidation: + """Verify domain/service format validation prevents path traversal in URL. + + The domain and service parameters are interpolated into + /api/services/{domain}/{service}, so allowing arbitrary strings would + enable SSRF via path traversal or blocked-domain bypass. + """ + + def test_valid_domain_names(self): + assert _SERVICE_NAME_RE.match("light") + assert _SERVICE_NAME_RE.match("switch") + assert _SERVICE_NAME_RE.match("climate") + assert _SERVICE_NAME_RE.match("shell_command") + assert _SERVICE_NAME_RE.match("media_player") + + def test_valid_service_names(self): + assert _SERVICE_NAME_RE.match("turn_on") + assert _SERVICE_NAME_RE.match("turn_off") + assert _SERVICE_NAME_RE.match("set_temperature") + assert _SERVICE_NAME_RE.match("toggle") + + def test_path_traversal_in_domain_rejected(self): + assert _SERVICE_NAME_RE.match("../../api/config") is None + assert _SERVICE_NAME_RE.match("light/../../../etc") is None + assert _SERVICE_NAME_RE.match("../config") is None + + def test_path_traversal_in_service_rejected(self): + assert _SERVICE_NAME_RE.match("../../api/config") is None + assert _SERVICE_NAME_RE.match("turn_on/../../config") is None + + def test_blocked_domain_bypass_via_traversal_rejected(self): + """Ensure shell_command/../light is rejected, not just checked against blocklist.""" + assert _SERVICE_NAME_RE.match("shell_command/../light") is None + assert _SERVICE_NAME_RE.match("python_script/../scene") is None + assert _SERVICE_NAME_RE.match("hassio/../automation") is None + + def test_slashes_rejected(self): + assert _SERVICE_NAME_RE.match("light/turn_on") is None + assert _SERVICE_NAME_RE.match("a/b/c") is None + + def test_dots_rejected(self): + assert _SERVICE_NAME_RE.match("light.turn_on") is None + assert _SERVICE_NAME_RE.match("..") is None + + def test_uppercase_rejected(self): + assert _SERVICE_NAME_RE.match("LIGHT") is None + assert _SERVICE_NAME_RE.match("Turn_On") is None + + def test_special_chars_rejected(self): + assert _SERVICE_NAME_RE.match("light;rm") is None + assert _SERVICE_NAME_RE.match("light&cmd") is None + assert _SERVICE_NAME_RE.match("light cmd") is None + + def test_handler_rejects_traversal_domain(self): + """_handle_call_service must reject domain with path traversal.""" + result = json.loads(_handle_call_service({ + "domain": "../../api/config", + "service": "turn_on", + })) + assert "error" in result + assert "Invalid domain" in result["error"] + + def test_handler_rejects_traversal_service(self): + """_handle_call_service must reject service with path traversal.""" + result = json.loads(_handle_call_service({ + "domain": "light", + "service": "../../api/config", + })) + assert "error" in result + assert "Invalid service" in result["error"] + + def test_handler_rejects_blocklist_bypass_traversal(self): + """Blocklist bypass via shell_command/../light must be caught by format validation.""" + result = json.loads(_handle_call_service({ + "domain": "shell_command/../light", + "service": "turn_on", + })) + assert "error" in result + # Must be rejected as "Invalid domain", not slip through the blocklist + assert "Invalid domain" in result["error"] + + # --------------------------------------------------------------------------- # Availability check # --------------------------------------------------------------------------- diff --git a/tools/homeassistant_tool.py b/tools/homeassistant_tool.py index 0ab99b4bf..bf5514de1 100644 --- a/tools/homeassistant_tool.py +++ b/tools/homeassistant_tool.py @@ -38,6 +38,15 @@ def _get_config(): # Regex for valid HA entity_id format (e.g. "light.living_room", "sensor.temperature_1") _ENTITY_ID_RE = re.compile(r"^[a-z_][a-z0-9_]*\.[a-z0-9_]+$") +# Regex for valid HA service/domain names (e.g. "light", "turn_on", "shell_command"). +# Only lowercase ASCII letters, digits, and underscores — no slashes, dots, or +# other characters that could allow path traversal in URL construction. +# The domain and service are interpolated into /api/services/{domain}/{service}, +# so allowing arbitrary strings would enable SSRF via path traversal +# (e.g. domain="../../api/config") or blocked-domain bypass +# (e.g. domain="shell_command/../light"). +_SERVICE_NAME_RE = re.compile(r"^[a-z][a-z0-9_]*$") + # Service domains blocked for security -- these allow arbitrary code/command # execution on the HA host or enable SSRF attacks on the local network. # HA provides zero service-level access control; all safety must be in our layer. @@ -246,6 +255,14 @@ def _handle_call_service(args: dict, **kw) -> str: if not domain or not service: return tool_error("Missing required parameters: domain and service") + # Validate domain/service format BEFORE the blocklist check — prevents + # path traversal in /api/services/{domain}/{service} and blocklist bypass + # via payloads like "shell_command/../light". + if not _SERVICE_NAME_RE.match(domain): + return tool_error(f"Invalid domain format: {domain!r}") + if not _SERVICE_NAME_RE.match(service): + return tool_error(f"Invalid service format: {service!r}") + if domain in _BLOCKED_DOMAINS: return json.dumps({ "error": f"Service domain '{domain}' is blocked for security. "