diff --git a/tests/gateway/test_webhook_dynamic_routes.py b/tests/gateway/test_webhook_dynamic_routes.py index 2029dd1399e..c185a6eb15e 100644 --- a/tests/gateway/test_webhook_dynamic_routes.py +++ b/tests/gateway/test_webhook_dynamic_routes.py @@ -6,7 +6,11 @@ import pytest from pathlib import Path from gateway.config import PlatformConfig -from gateway.platforms.webhook import WebhookAdapter, _DYNAMIC_ROUTES_FILENAME +from gateway.platforms.webhook import ( + WebhookAdapter, + _DYNAMIC_ROUTES_FILENAME, + _INSECURE_NO_AUTH, +) def _make_adapter(routes=None, extra=None): @@ -85,3 +89,78 @@ class TestDynamicRouteLoading: adapter._reload_dynamic_routes() assert "static" in adapter._routes assert len(adapter._dynamic_routes) == 0 + + +class TestDynamicRouteSecretValidation: + """Empty/missing secrets must be rejected during hot-reload. + + Regression for HMAC bypass: prior to the fix, an agent-induced + dynamic route with `"secret": ""` would be merged into self._routes + by _reload_dynamic_routes(), then _handle_webhook's + `if secret and secret != _INSECURE_NO_AUTH` would skip signature + validation because empty string is falsy. Unauthenticated POSTs + would then execute the webhook prompt. + """ + + def test_empty_secret_rejected(self, tmp_path): + # Explicit empty-string secret must NOT fall back to the global + # secret, and the route must be skipped entirely. + (tmp_path / _DYNAMIC_ROUTES_FILENAME).write_text( + json.dumps({"evil": {"secret": "", "prompt": "rm -rf"}}) + ) + adapter = _make_adapter() # has global secret + adapter._reload_dynamic_routes() + assert "evil" not in adapter._routes + assert "evil" not in adapter._dynamic_routes + + def test_missing_secret_no_global_rejected(self, tmp_path): + (tmp_path / _DYNAMIC_ROUTES_FILENAME).write_text( + json.dumps({"orphan": {"prompt": "test"}}) + ) + # No global secret configured + adapter = _make_adapter(extra={"secret": ""}) + adapter._reload_dynamic_routes() + assert "orphan" not in adapter._routes + assert "orphan" not in adapter._dynamic_routes + + def test_missing_secret_inherits_global(self, tmp_path): + # No per-route secret but a global one is set → route is kept, + # the global secret protects it. Preserves existing fallback. + (tmp_path / _DYNAMIC_ROUTES_FILENAME).write_text( + json.dumps({"valid": {"prompt": "ok"}}) + ) + adapter = _make_adapter() # global secret set + adapter._reload_dynamic_routes() + assert "valid" in adapter._routes + + def test_insecure_no_auth_preserved(self, tmp_path): + # Explicit opt-in escape hatch for local testing — must still load. + (tmp_path / _DYNAMIC_ROUTES_FILENAME).write_text( + json.dumps({"test": {"secret": _INSECURE_NO_AUTH, "prompt": "p"}}) + ) + adapter = _make_adapter() + adapter._reload_dynamic_routes() + assert "test" in adapter._routes + + def test_warning_logged_on_skip(self, tmp_path, caplog): + import logging + (tmp_path / _DYNAMIC_ROUTES_FILENAME).write_text( + json.dumps({"silent": {"secret": "", "prompt": "x"}}) + ) + adapter = _make_adapter() + with caplog.at_level(logging.WARNING, logger="gateway.platforms.webhook"): + adapter._reload_dynamic_routes() + assert any("silent" in rec.message for rec in caplog.records) + + def test_partial_skip(self, tmp_path): + # One route bad, one route good — only the bad one is dropped. + (tmp_path / _DYNAMIC_ROUTES_FILENAME).write_text( + json.dumps({ + "bad": {"secret": "", "prompt": "x"}, + "good": {"secret": "valid-secret", "prompt": "y"}, + }) + ) + adapter = _make_adapter() + adapter._reload_dynamic_routes() + assert "good" in adapter._routes + assert "bad" not in adapter._routes