test(webhook): regression cases for empty-secret HMAC bypass

Covers _reload_dynamic_routes() rejecting empty or missing per-route
secrets when no global fallback exists, preserving the INSECURE_NO_AUTH
opt-in, inheriting a global secret when only the per-route value is
missing, and partial-skip when only one of multiple routes is bad.
This commit is contained in:
Teknium 2026-05-22 03:40:45 -07:00
parent 9c90b3a597
commit 3fc715ddf5

View file

@ -6,7 +6,11 @@ import pytest
from pathlib import Path
from gateway.config import PlatformConfig
from gateway.platforms.webhook import WebhookAdapter, _DYNAMIC_ROUTES_FILENAME
from gateway.platforms.webhook import (
WebhookAdapter,
_DYNAMIC_ROUTES_FILENAME,
_INSECURE_NO_AUTH,
)
def _make_adapter(routes=None, extra=None):
@ -85,3 +89,78 @@ class TestDynamicRouteLoading:
adapter._reload_dynamic_routes()
assert "static" in adapter._routes
assert len(adapter._dynamic_routes) == 0
class TestDynamicRouteSecretValidation:
"""Empty/missing secrets must be rejected during hot-reload.
Regression for HMAC bypass: prior to the fix, an agent-induced
dynamic route with `"secret": ""` would be merged into self._routes
by _reload_dynamic_routes(), then _handle_webhook's
`if secret and secret != _INSECURE_NO_AUTH` would skip signature
validation because empty string is falsy. Unauthenticated POSTs
would then execute the webhook prompt.
"""
def test_empty_secret_rejected(self, tmp_path):
# Explicit empty-string secret must NOT fall back to the global
# secret, and the route must be skipped entirely.
(tmp_path / _DYNAMIC_ROUTES_FILENAME).write_text(
json.dumps({"evil": {"secret": "", "prompt": "rm -rf"}})
)
adapter = _make_adapter() # has global secret
adapter._reload_dynamic_routes()
assert "evil" not in adapter._routes
assert "evil" not in adapter._dynamic_routes
def test_missing_secret_no_global_rejected(self, tmp_path):
(tmp_path / _DYNAMIC_ROUTES_FILENAME).write_text(
json.dumps({"orphan": {"prompt": "test"}})
)
# No global secret configured
adapter = _make_adapter(extra={"secret": ""})
adapter._reload_dynamic_routes()
assert "orphan" not in adapter._routes
assert "orphan" not in adapter._dynamic_routes
def test_missing_secret_inherits_global(self, tmp_path):
# No per-route secret but a global one is set → route is kept,
# the global secret protects it. Preserves existing fallback.
(tmp_path / _DYNAMIC_ROUTES_FILENAME).write_text(
json.dumps({"valid": {"prompt": "ok"}})
)
adapter = _make_adapter() # global secret set
adapter._reload_dynamic_routes()
assert "valid" in adapter._routes
def test_insecure_no_auth_preserved(self, tmp_path):
# Explicit opt-in escape hatch for local testing — must still load.
(tmp_path / _DYNAMIC_ROUTES_FILENAME).write_text(
json.dumps({"test": {"secret": _INSECURE_NO_AUTH, "prompt": "p"}})
)
adapter = _make_adapter()
adapter._reload_dynamic_routes()
assert "test" in adapter._routes
def test_warning_logged_on_skip(self, tmp_path, caplog):
import logging
(tmp_path / _DYNAMIC_ROUTES_FILENAME).write_text(
json.dumps({"silent": {"secret": "", "prompt": "x"}})
)
adapter = _make_adapter()
with caplog.at_level(logging.WARNING, logger="gateway.platforms.webhook"):
adapter._reload_dynamic_routes()
assert any("silent" in rec.message for rec in caplog.records)
def test_partial_skip(self, tmp_path):
# One route bad, one route good — only the bad one is dropped.
(tmp_path / _DYNAMIC_ROUTES_FILENAME).write_text(
json.dumps({
"bad": {"secret": "", "prompt": "x"},
"good": {"secret": "valid-secret", "prompt": "y"},
})
)
adapter = _make_adapter()
adapter._reload_dynamic_routes()
assert "good" in adapter._routes
assert "bad" not in adapter._routes