diff --git a/gateway/config.py b/gateway/config.py index 342af97648..7d61659279 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -665,6 +665,17 @@ def load_gateway_config() -> GatewayConfig: _apply_env_overrides(config) # --- Validate loaded values --- + _validate_gateway_config(config) + + return config + + +def _validate_gateway_config(config: "GatewayConfig") -> None: + """Validate and sanitize a loaded GatewayConfig in place. + + Called by ``load_gateway_config()`` after all config sources are merged. + Extracted as a separate function for testability. + """ policy = config.default_reset_policy if not (0 <= policy.at_hour <= 23): @@ -701,7 +712,31 @@ def load_gateway_config() -> GatewayConfig: platform.value, env_name, ) - return config + # Reject known-weak placeholder tokens. + # Ported from openclaw/openclaw#64586: users who copy .env.example + # without changing placeholder values get a clear startup error instead + # of a confusing "auth failed" from the platform API. + try: + from hermes_cli.auth import has_usable_secret + except ImportError: + has_usable_secret = None # type: ignore[assignment] + + if has_usable_secret is not None: + for platform, pconfig in config.platforms.items(): + if not pconfig.enabled: + continue + env_name = _token_env_names.get(platform) + if not env_name: + continue + token = pconfig.token + if token and token.strip() and not has_usable_secret(token, min_length=4): + logger.error( + "%s is enabled but %s is set to a placeholder value ('%s'). " + "Set a real bot token before starting the gateway. " + "The adapter will NOT be started.", + platform.value, env_name, token.strip()[:6] + "...", + ) + pconfig.enabled = False def _apply_env_overrides(config: GatewayConfig) -> None: diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index df3fbe1d30..9a49904659 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -1819,6 +1819,23 @@ class APIServerAdapter(BasePlatformAdapter): ) return False + # Refuse to start network-accessible with a placeholder key. + # Ported from openclaw/openclaw#64586. + if is_network_accessible(self._host) and self._api_key: + try: + from hermes_cli.auth import has_usable_secret + if not has_usable_secret(self._api_key, min_length=8): + logger.error( + "[%s] Refusing to start: API_SERVER_KEY is set to a " + "placeholder value. Generate a real secret " + "(e.g. `openssl rand -hex 32`) and set API_SERVER_KEY " + "before exposing the API server on %s.", + self.name, self._host, + ) + return False + except ImportError: + pass + # Port conflict detection — fail fast if port is already in use try: with _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) as _s: diff --git a/tests/gateway/test_weak_credential_guard.py b/tests/gateway/test_weak_credential_guard.py new file mode 100644 index 0000000000..7d6ea84b3f --- /dev/null +++ b/tests/gateway/test_weak_credential_guard.py @@ -0,0 +1,141 @@ +"""Tests for gateway weak credential rejection at startup. + +Ported from openclaw/openclaw#64586: rejects known-weak placeholder +tokens at gateway startup instead of letting them silently fail +against platform APIs. +""" + +import logging + +import pytest + +from gateway.config import PlatformConfig, Platform, _validate_gateway_config + + +# --------------------------------------------------------------------------- +# Helper: create a minimal GatewayConfig with one enabled platform +# --------------------------------------------------------------------------- + + +def _make_gateway_config(platform, token, enabled=True, **extra_kwargs): + """Create a minimal GatewayConfig-like object for validation testing.""" + from gateway.config import GatewayConfig + + config = GatewayConfig(platforms={}) + pconfig = PlatformConfig(enabled=enabled, token=token, **extra_kwargs) + config.platforms[platform] = pconfig + return config + + +def _validate_and_return(config): + """Call _validate_gateway_config and return the config (mutated in place).""" + _validate_gateway_config(config) + return config + + +# --------------------------------------------------------------------------- +# Unit tests: platform token placeholder rejection +# --------------------------------------------------------------------------- + + +class TestPlatformTokenPlaceholderGuard: + """Verify that _validate_gateway_config disables platforms with placeholder tokens.""" + + def test_rejects_triple_asterisk(self, caplog): + """'***' is the .env.example placeholder — should be rejected.""" + config = _make_gateway_config(Platform.TELEGRAM, "***") + with caplog.at_level(logging.ERROR): + _validate_and_return(config) + assert config.platforms[Platform.TELEGRAM].enabled is False + assert "placeholder" in caplog.text.lower() + + def test_rejects_changeme(self, caplog): + config = _make_gateway_config(Platform.DISCORD, "changeme") + with caplog.at_level(logging.ERROR): + _validate_and_return(config) + assert config.platforms[Platform.DISCORD].enabled is False + + def test_rejects_your_api_key(self, caplog): + config = _make_gateway_config(Platform.SLACK, "your_api_key") + with caplog.at_level(logging.ERROR): + _validate_and_return(config) + assert config.platforms[Platform.SLACK].enabled is False + + def test_rejects_placeholder(self, caplog): + config = _make_gateway_config(Platform.MATRIX, "placeholder") + with caplog.at_level(logging.ERROR): + _validate_and_return(config) + assert config.platforms[Platform.MATRIX].enabled is False + + def test_accepts_real_token(self, caplog): + """A real-looking bot token should pass validation.""" + config = _make_gateway_config( + Platform.TELEGRAM, "7123456789:AAHdqTcvCH1vGWJxfSeOfSAs0K5PALDsaw" + ) + with caplog.at_level(logging.ERROR): + _validate_and_return(config) + assert config.platforms[Platform.TELEGRAM].enabled is True + assert "placeholder" not in caplog.text.lower() + + def test_accepts_empty_token_without_error(self, caplog): + """Empty tokens get a warning (existing behavior), not a placeholder error.""" + config = _make_gateway_config(Platform.TELEGRAM, "") + with caplog.at_level(logging.WARNING): + _validate_and_return(config) + # Empty token doesn't trigger placeholder rejection — enabled stays True + # (the existing empty-token warning is separate) + assert config.platforms[Platform.TELEGRAM].enabled is True + + def test_disabled_platform_not_checked(self, caplog): + """Disabled platforms should not be validated.""" + config = _make_gateway_config(Platform.TELEGRAM, "***", enabled=False) + with caplog.at_level(logging.ERROR): + _validate_and_return(config) + assert "placeholder" not in caplog.text.lower() + + def test_rejects_whitespace_padded_placeholder(self, caplog): + """Whitespace-padded placeholders should still be caught.""" + config = _make_gateway_config(Platform.TELEGRAM, " *** ") + with caplog.at_level(logging.ERROR): + _validate_and_return(config) + assert config.platforms[Platform.TELEGRAM].enabled is False + + +# --------------------------------------------------------------------------- +# Integration test: API server placeholder key on network-accessible host +# --------------------------------------------------------------------------- + + +class TestAPIServerPlaceholderKeyGuard: + """Verify that the API server rejects placeholder keys on network hosts.""" + + @pytest.mark.asyncio + async def test_refuses_wildcard_with_placeholder_key(self): + from gateway.platforms.api_server import APIServerAdapter + + adapter = APIServerAdapter( + PlatformConfig(enabled=True, extra={"host": "0.0.0.0", "key": "changeme"}) + ) + result = await adapter.connect() + assert result is False + + @pytest.mark.asyncio + async def test_refuses_wildcard_with_asterisk_key(self): + from gateway.platforms.api_server import APIServerAdapter + + adapter = APIServerAdapter( + PlatformConfig(enabled=True, extra={"host": "0.0.0.0", "key": "***"}) + ) + result = await adapter.connect() + assert result is False + + def test_allows_loopback_with_placeholder_key(self): + """Loopback with a placeholder key is fine — not network-exposed.""" + from gateway.platforms.api_server import APIServerAdapter + from gateway.platforms.base import is_network_accessible + + adapter = APIServerAdapter( + PlatformConfig(enabled=True, extra={"host": "127.0.0.1", "key": "changeme"}) + ) + # On loopback the placeholder guard doesn't fire + assert is_network_accessible(adapter._host) is False