fix(security): validate domain/service params in ha_call_service to prevent path traversal

This commit is contained in:
Dusk1e 2026-04-12 14:13:13 +03:00 committed by Teknium
parent 8a64f3e368
commit c052cf0eea
2 changed files with 105 additions and 0 deletions

View file

@ -18,6 +18,7 @@ from tools.homeassistant_tool import (
_handle_call_service, _handle_call_service,
_BLOCKED_DOMAINS, _BLOCKED_DOMAINS,
_ENTITY_ID_RE, _ENTITY_ID_RE,
_SERVICE_NAME_RE,
) )
@ -303,6 +304,93 @@ class TestEntityIdValidation:
assert "Invalid entity_id" not in result["error"] assert "Invalid entity_id" not in result["error"]
# ---------------------------------------------------------------------------
# Security: domain/service name format validation
# ---------------------------------------------------------------------------
class TestServiceNameValidation:
"""Verify domain/service format validation prevents path traversal in URL.
The domain and service parameters are interpolated into
/api/services/{domain}/{service}, so allowing arbitrary strings would
enable SSRF via path traversal or blocked-domain bypass.
"""
def test_valid_domain_names(self):
assert _SERVICE_NAME_RE.match("light")
assert _SERVICE_NAME_RE.match("switch")
assert _SERVICE_NAME_RE.match("climate")
assert _SERVICE_NAME_RE.match("shell_command")
assert _SERVICE_NAME_RE.match("media_player")
def test_valid_service_names(self):
assert _SERVICE_NAME_RE.match("turn_on")
assert _SERVICE_NAME_RE.match("turn_off")
assert _SERVICE_NAME_RE.match("set_temperature")
assert _SERVICE_NAME_RE.match("toggle")
def test_path_traversal_in_domain_rejected(self):
assert _SERVICE_NAME_RE.match("../../api/config") is None
assert _SERVICE_NAME_RE.match("light/../../../etc") is None
assert _SERVICE_NAME_RE.match("../config") is None
def test_path_traversal_in_service_rejected(self):
assert _SERVICE_NAME_RE.match("../../api/config") is None
assert _SERVICE_NAME_RE.match("turn_on/../../config") is None
def test_blocked_domain_bypass_via_traversal_rejected(self):
"""Ensure shell_command/../light is rejected, not just checked against blocklist."""
assert _SERVICE_NAME_RE.match("shell_command/../light") is None
assert _SERVICE_NAME_RE.match("python_script/../scene") is None
assert _SERVICE_NAME_RE.match("hassio/../automation") is None
def test_slashes_rejected(self):
assert _SERVICE_NAME_RE.match("light/turn_on") is None
assert _SERVICE_NAME_RE.match("a/b/c") is None
def test_dots_rejected(self):
assert _SERVICE_NAME_RE.match("light.turn_on") is None
assert _SERVICE_NAME_RE.match("..") is None
def test_uppercase_rejected(self):
assert _SERVICE_NAME_RE.match("LIGHT") is None
assert _SERVICE_NAME_RE.match("Turn_On") is None
def test_special_chars_rejected(self):
assert _SERVICE_NAME_RE.match("light;rm") is None
assert _SERVICE_NAME_RE.match("light&cmd") is None
assert _SERVICE_NAME_RE.match("light cmd") is None
def test_handler_rejects_traversal_domain(self):
"""_handle_call_service must reject domain with path traversal."""
result = json.loads(_handle_call_service({
"domain": "../../api/config",
"service": "turn_on",
}))
assert "error" in result
assert "Invalid domain" in result["error"]
def test_handler_rejects_traversal_service(self):
"""_handle_call_service must reject service with path traversal."""
result = json.loads(_handle_call_service({
"domain": "light",
"service": "../../api/config",
}))
assert "error" in result
assert "Invalid service" in result["error"]
def test_handler_rejects_blocklist_bypass_traversal(self):
"""Blocklist bypass via shell_command/../light must be caught by format validation."""
result = json.loads(_handle_call_service({
"domain": "shell_command/../light",
"service": "turn_on",
}))
assert "error" in result
# Must be rejected as "Invalid domain", not slip through the blocklist
assert "Invalid domain" in result["error"]
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Availability check # Availability check
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------

View file

@ -38,6 +38,15 @@ def _get_config():
# Regex for valid HA entity_id format (e.g. "light.living_room", "sensor.temperature_1") # Regex for valid HA entity_id format (e.g. "light.living_room", "sensor.temperature_1")
_ENTITY_ID_RE = re.compile(r"^[a-z_][a-z0-9_]*\.[a-z0-9_]+$") _ENTITY_ID_RE = re.compile(r"^[a-z_][a-z0-9_]*\.[a-z0-9_]+$")
# Regex for valid HA service/domain names (e.g. "light", "turn_on", "shell_command").
# Only lowercase ASCII letters, digits, and underscores — no slashes, dots, or
# other characters that could allow path traversal in URL construction.
# The domain and service are interpolated into /api/services/{domain}/{service},
# so allowing arbitrary strings would enable SSRF via path traversal
# (e.g. domain="../../api/config") or blocked-domain bypass
# (e.g. domain="shell_command/../light").
_SERVICE_NAME_RE = re.compile(r"^[a-z][a-z0-9_]*$")
# Service domains blocked for security -- these allow arbitrary code/command # Service domains blocked for security -- these allow arbitrary code/command
# execution on the HA host or enable SSRF attacks on the local network. # execution on the HA host or enable SSRF attacks on the local network.
# HA provides zero service-level access control; all safety must be in our layer. # HA provides zero service-level access control; all safety must be in our layer.
@ -246,6 +255,14 @@ def _handle_call_service(args: dict, **kw) -> str:
if not domain or not service: if not domain or not service:
return tool_error("Missing required parameters: domain and service") return tool_error("Missing required parameters: domain and service")
# Validate domain/service format BEFORE the blocklist check — prevents
# path traversal in /api/services/{domain}/{service} and blocklist bypass
# via payloads like "shell_command/../light".
if not _SERVICE_NAME_RE.match(domain):
return tool_error(f"Invalid domain format: {domain!r}")
if not _SERVICE_NAME_RE.match(service):
return tool_error(f"Invalid service format: {service!r}")
if domain in _BLOCKED_DOMAINS: if domain in _BLOCKED_DOMAINS:
return json.dumps({ return json.dumps({
"error": f"Service domain '{domain}' is blocked for security. " "error": f"Service domain '{domain}' is blocked for security. "