diff --git a/gateway/platforms/bluebubbles.py b/gateway/platforms/bluebubbles.py index 1842729d23..f50cd9503c 100644 --- a/gateway/platforms/bluebubbles.py +++ b/gateway/platforms/bluebubbles.py @@ -226,24 +226,44 @@ class BlueBubblesAdapter(BasePlatformAdapter): self._runner = None self._mark_disconnected() + @property + def _webhook_url(self) -> str: + """Compute the external webhook URL for BlueBubbles registration.""" + host = self.webhook_host + if host in ("0.0.0.0", "127.0.0.1", "localhost", "::"): + host = "localhost" + return f"http://{host}:{self.webhook_port}{self.webhook_path}" + + async def _find_registered_webhooks(self, url: str) -> list: + """Return list of BB webhook entries matching *url*.""" + try: + res = await self._api_get("/api/v1/webhook") + data = res.get("data") + if isinstance(data, list): + return [wh for wh in data if wh.get("url") == url] + except Exception: + pass + return [] + async def _register_webhook(self) -> bool: """Register this webhook URL with the BlueBubbles server. BlueBubbles requires webhooks to be registered via API before - it will send events. This method registers our listener URL - for new-message and updated-message events. + it will send events. Checks for an existing registration first + to avoid duplicates (e.g. after a crash without clean shutdown). """ if not self.client: return False - webhook_url = f"http://{self.webhook_host}:{self.webhook_port}{self.webhook_path}" - # Use host.docker.internal or public IP if webhook is 0.0.0.0/127.0.0.1 - # and server is on a different host - if self.webhook_host in ("0.0.0.0", "127.0.0.1", "localhost", "::"): - # For local development, we need the external IP that BlueBubbles can reach - # Default to localhost for same-machine setups - external_host = "localhost" - webhook_url = f"http://{external_host}:{self.webhook_port}{self.webhook_path}" + webhook_url = self._webhook_url + + # Crash resilience — reuse an existing registration if present + existing = await self._find_registered_webhooks(webhook_url) + if existing: + logger.info( + "[bluebubbles] webhook already registered: %s", webhook_url + ) + return True payload = { "url": webhook_url, @@ -252,16 +272,17 @@ class BlueBubblesAdapter(BasePlatformAdapter): try: res = await self._api_post("/api/v1/webhook", payload) - if res.get("status") == 200: + status = res.get("status", 0) + if 200 <= status < 300: logger.info( - "[bluebubbles] webhook registered successfully with server: %s", + "[bluebubbles] webhook registered with server: %s", webhook_url, ) return True else: logger.warning( - "[bluebubbles] webhook registration returned non-200 status: %s - %s", - res.get("status"), + "[bluebubbles] webhook registration returned status %s: %s", + status, res.get("message"), ) return False @@ -275,41 +296,34 @@ class BlueBubblesAdapter(BasePlatformAdapter): async def _unregister_webhook(self) -> bool: """Unregister this webhook URL from the BlueBubbles server. - Cleans up the webhook registration when the gateway shuts down. + Removes *all* matching registrations to clean up any duplicates + left by prior crashes. """ if not self.client: return False - webhook_url = f"http://{self.webhook_host}:{self.webhook_port}{self.webhook_path}" - if self.webhook_host in ("0.0.0.0", "127.0.0.1", "localhost", "::"): - external_host = "localhost" - webhook_url = f"http://{external_host}:{self.webhook_port}{self.webhook_path}" + webhook_url = self._webhook_url + removed = False try: - # Get current webhooks - webhooks = await self._api_get("/api/v1/webhook") - if webhooks.get("status") == 200: - data = webhooks.get("data", []) - for webhook in data: - if webhook.get("url") == webhook_url: - # Delete this specific webhook - webhook_id = webhook.get("id") - if webhook_id: - res = await self.client.delete( - self._api_url(f"/api/v1/webhook/{webhook_id}") - ) - res.raise_for_status() - logger.info( - "[bluebubbles] webhook unregistered: %s", - webhook_url, - ) - return True + for wh in await self._find_registered_webhooks(webhook_url): + wh_id = wh.get("id") + if wh_id: + res = await self.client.delete( + self._api_url(f"/api/v1/webhook/{wh_id}") + ) + res.raise_for_status() + removed = True + if removed: + logger.info( + "[bluebubbles] webhook unregistered: %s", webhook_url + ) except Exception as exc: logger.debug( "[bluebubbles] failed to unregister webhook (non-critical): %s", exc, ) - return False + return removed # ------------------------------------------------------------------ # Chat GUID resolution diff --git a/tests/gateway/test_bluebubbles.py b/tests/gateway/test_bluebubbles.py index 939a69ff15..86220d4407 100644 --- a/tests/gateway/test_bluebubbles.py +++ b/tests/gateway/test_bluebubbles.py @@ -359,3 +359,257 @@ class TestBlueBubblesAttachmentDownload: adapter._download_attachment("att-guid", {"mimeType": "image/png"}) ) assert result is None + + +# --------------------------------------------------------------------------- +# Webhook registration +# --------------------------------------------------------------------------- + + +class TestBlueBubblesWebhookUrl: + """_webhook_url property normalises local hosts to 'localhost'.""" + + def test_default_host(self, monkeypatch): + adapter = _make_adapter(monkeypatch) + # Default webhook_host is 0.0.0.0 → normalized to localhost + assert "localhost" in adapter._webhook_url + assert str(adapter.webhook_port) in adapter._webhook_url + assert adapter.webhook_path in adapter._webhook_url + + @pytest.mark.parametrize("host", ["0.0.0.0", "127.0.0.1", "localhost", "::"]) + def test_local_hosts_normalized(self, monkeypatch, host): + adapter = _make_adapter(monkeypatch, webhook_host=host) + assert adapter._webhook_url.startswith("http://localhost:") + + def test_custom_host_preserved(self, monkeypatch): + adapter = _make_adapter(monkeypatch, webhook_host="192.168.1.50") + assert "192.168.1.50" in adapter._webhook_url + + +class TestBlueBubblesWebhookRegistration: + """Tests for _register_webhook, _unregister_webhook, _find_registered_webhooks.""" + + @staticmethod + def _mock_client(get_response=None, post_response=None, delete_ok=True): + """Build a tiny mock httpx.AsyncClient.""" + + async def mock_get(*args, **kwargs): + class R: + status_code = 200 + def raise_for_status(self): + pass + def json(self): + return get_response or {"status": 200, "data": []} + return R() + + async def mock_post(*args, **kwargs): + class R: + status_code = 200 + def raise_for_status(self): + pass + def json(self): + return post_response or {"status": 200, "data": {}} + return R() + + async def mock_delete(*args, **kwargs): + class R: + status_code = 200 if delete_ok else 500 + def raise_for_status(self_inner): + if not delete_ok: + raise Exception("delete failed") + return R() + + return type( + "MockClient", (), + {"get": mock_get, "post": mock_post, "delete": mock_delete}, + )() + + # -- _find_registered_webhooks -- + + def test_find_registered_webhooks_returns_matches(self, monkeypatch): + import asyncio + adapter = _make_adapter(monkeypatch) + url = adapter._webhook_url + adapter.client = self._mock_client( + get_response={"status": 200, "data": [ + {"id": 1, "url": url, "events": ["new-message"]}, + {"id": 2, "url": "http://other:9999/hook", "events": ["message"]}, + ]} + ) + result = asyncio.get_event_loop().run_until_complete( + adapter._find_registered_webhooks(url) + ) + assert len(result) == 1 + assert result[0]["id"] == 1 + + def test_find_registered_webhooks_empty_when_none(self, monkeypatch): + import asyncio + adapter = _make_adapter(monkeypatch) + adapter.client = self._mock_client( + get_response={"status": 200, "data": []} + ) + result = asyncio.get_event_loop().run_until_complete( + adapter._find_registered_webhooks(adapter._webhook_url) + ) + assert result == [] + + def test_find_registered_webhooks_handles_api_error(self, monkeypatch): + import asyncio + adapter = _make_adapter(monkeypatch) + adapter.client = self._mock_client() + + # Override _api_get to raise + async def bad_get(path): + raise ConnectionError("server down") + adapter._api_get = bad_get + + result = asyncio.get_event_loop().run_until_complete( + adapter._find_registered_webhooks(adapter._webhook_url) + ) + assert result == [] + + # -- _register_webhook -- + + def test_register_fresh(self, monkeypatch): + """No existing webhook → POST creates one.""" + import asyncio + adapter = _make_adapter(monkeypatch) + adapter.client = self._mock_client( + get_response={"status": 200, "data": []}, + post_response={"status": 200, "data": {"id": 42}}, + ) + ok = asyncio.get_event_loop().run_until_complete( + adapter._register_webhook() + ) + assert ok is True + + def test_register_accepts_201(self, monkeypatch): + """BB might return 201 Created — must still succeed.""" + import asyncio + adapter = _make_adapter(monkeypatch) + adapter.client = self._mock_client( + get_response={"status": 200, "data": []}, + post_response={"status": 201, "data": {"id": 43}}, + ) + ok = asyncio.get_event_loop().run_until_complete( + adapter._register_webhook() + ) + assert ok is True + + def test_register_reuses_existing(self, monkeypatch): + """Crash resilience — existing registration is reused, no POST needed.""" + import asyncio + adapter = _make_adapter(monkeypatch) + url = adapter._webhook_url + adapter.client = self._mock_client( + get_response={"status": 200, "data": [ + {"id": 7, "url": url, "events": ["new-message"]}, + ]}, + ) + + # Track whether POST was called + post_called = False + orig_api_post = adapter._api_post + async def tracking_post(path, payload): + nonlocal post_called + post_called = True + return await orig_api_post(path, payload) + adapter._api_post = tracking_post + + ok = asyncio.get_event_loop().run_until_complete( + adapter._register_webhook() + ) + assert ok is True + assert not post_called, "Should reuse existing, not POST again" + + def test_register_returns_false_without_client(self, monkeypatch): + import asyncio + adapter = _make_adapter(monkeypatch) + adapter.client = None + ok = asyncio.get_event_loop().run_until_complete( + adapter._register_webhook() + ) + assert ok is False + + def test_register_returns_false_on_server_error(self, monkeypatch): + import asyncio + adapter = _make_adapter(monkeypatch) + adapter.client = self._mock_client( + get_response={"status": 200, "data": []}, + post_response={"status": 500, "message": "internal error"}, + ) + ok = asyncio.get_event_loop().run_until_complete( + adapter._register_webhook() + ) + assert ok is False + + # -- _unregister_webhook -- + + def test_unregister_removes_matching(self, monkeypatch): + import asyncio + adapter = _make_adapter(monkeypatch) + url = adapter._webhook_url + adapter.client = self._mock_client( + get_response={"status": 200, "data": [ + {"id": 10, "url": url}, + ]}, + ) + ok = asyncio.get_event_loop().run_until_complete( + adapter._unregister_webhook() + ) + assert ok is True + + def test_unregister_removes_all_duplicates(self, monkeypatch): + """Multiple orphaned registrations for same URL — all get removed.""" + import asyncio + adapter = _make_adapter(monkeypatch) + url = adapter._webhook_url + deleted_ids = [] + + async def mock_delete(*args, **kwargs): + # Extract ID from URL + url_str = args[0] if args else "" + deleted_ids.append(url_str) + class R: + status_code = 200 + def raise_for_status(self): + pass + return R() + + adapter.client = self._mock_client( + get_response={"status": 200, "data": [ + {"id": 1, "url": url}, + {"id": 2, "url": url}, + {"id": 3, "url": "http://other/hook"}, + ]}, + ) + adapter.client.delete = mock_delete + + ok = asyncio.get_event_loop().run_until_complete( + adapter._unregister_webhook() + ) + assert ok is True + assert len(deleted_ids) == 2 + + def test_unregister_returns_false_without_client(self, monkeypatch): + import asyncio + adapter = _make_adapter(monkeypatch) + adapter.client = None + ok = asyncio.get_event_loop().run_until_complete( + adapter._unregister_webhook() + ) + assert ok is False + + def test_unregister_handles_api_failure_gracefully(self, monkeypatch): + import asyncio + adapter = _make_adapter(monkeypatch) + adapter.client = self._mock_client() + + async def bad_get(path): + raise ConnectionError("server down") + adapter._api_get = bad_get + + ok = asyncio.get_event_loop().run_until_complete( + adapter._unregister_webhook() + ) + assert ok is False