fix(gateway): prevent scoped lock and resource leaks on connection failure

This commit is contained in:
Ruzzgar 2026-04-19 08:51:34 +03:00 committed by Teknium
parent a5063ff105
commit f23123e7b4
6 changed files with 127 additions and 36 deletions

View file

@ -223,31 +223,40 @@ class SignalAdapter(BasePlatformAdapter):
return False
# Acquire scoped lock to prevent duplicate Signal listeners for the same phone
lock_acquired = False
try:
if not self._acquire_platform_lock('signal-phone', self.account, 'Signal account'):
return False
lock_acquired = True
except Exception as e:
logger.warning("Signal: Could not acquire phone lock (non-fatal): %s", e)
self.client = httpx.AsyncClient(timeout=30.0)
# Health check — verify signal-cli daemon is reachable
try:
resp = await self.client.get(f"{self.http_url}/api/v1/check", timeout=10.0)
if resp.status_code != 200:
logger.error("Signal: health check failed (status %d)", resp.status_code)
# Health check — verify signal-cli daemon is reachable
try:
resp = await self.client.get(f"{self.http_url}/api/v1/check", timeout=10.0)
if resp.status_code != 200:
logger.error("Signal: health check failed (status %d)", resp.status_code)
return False
except Exception as e:
logger.error("Signal: cannot reach signal-cli at %s: %s", self.http_url, e)
return False
except Exception as e:
logger.error("Signal: cannot reach signal-cli at %s: %s", self.http_url, e)
return False
self._running = True
self._last_sse_activity = time.time()
self._sse_task = asyncio.create_task(self._sse_listener())
self._health_monitor_task = asyncio.create_task(self._health_monitor())
self._running = True
self._last_sse_activity = time.time()
self._sse_task = asyncio.create_task(self._sse_listener())
self._health_monitor_task = asyncio.create_task(self._health_monitor())
logger.info("Signal: connected to %s", self.http_url)
return True
logger.info("Signal: connected to %s", self.http_url)
return True
finally:
if not self._running:
if self.client:
await self.client.aclose()
self.client = None
if lock_acquired:
self._release_platform_lock()
async def disconnect(self) -> None:
"""Stop SSE listener and clean up."""

View file

@ -150,9 +150,11 @@ class SlackAdapter(BasePlatformAdapter):
except Exception as e:
logger.warning("[Slack] Failed to read %s: %s", tokens_file, e)
lock_acquired = False
try:
if not self._acquire_platform_lock('slack-app-token', app_token, 'Slack app token'):
return False
lock_acquired = True
# First token is the primary — used for AsyncApp / Socket Mode
primary_token = bot_tokens[0]
@ -228,6 +230,9 @@ class SlackAdapter(BasePlatformAdapter):
except Exception as e: # pragma: no cover - defensive logging
logger.error("[Slack] Connection failed: %s", e, exc_info=True)
return False
finally:
if lock_acquired and not self._running:
self._release_platform_lock()
async def disconnect(self) -> None:
"""Disconnect from Slack."""

View file

@ -289,33 +289,35 @@ class WhatsAppAdapter(BasePlatformAdapter):
logger.info("[%s] Bridge found at %s", self.name, bridge_path)
# Acquire scoped lock to prevent duplicate sessions
lock_acquired = False
try:
if not self._acquire_platform_lock('whatsapp-session', str(self._session_path), 'WhatsApp session'):
return False
lock_acquired = True
except Exception as e:
logger.warning("[%s] Could not acquire session lock (non-fatal): %s", self.name, e)
# Auto-install npm dependencies if node_modules doesn't exist
bridge_dir = bridge_path.parent
if not (bridge_dir / "node_modules").exists():
print(f"[{self.name}] Installing WhatsApp bridge dependencies...")
try:
install_result = subprocess.run(
["npm", "install", "--silent"],
cwd=str(bridge_dir),
capture_output=True,
text=True,
timeout=60,
)
if install_result.returncode != 0:
print(f"[{self.name}] npm install failed: {install_result.stderr}")
return False
print(f"[{self.name}] Dependencies installed")
except Exception as e:
print(f"[{self.name}] Failed to install dependencies: {e}")
return False
try:
# Auto-install npm dependencies if node_modules doesn't exist
bridge_dir = bridge_path.parent
if not (bridge_dir / "node_modules").exists():
print(f"[{self.name}] Installing WhatsApp bridge dependencies...")
try:
install_result = subprocess.run(
["npm", "install", "--silent"],
cwd=str(bridge_dir),
capture_output=True,
text=True,
timeout=60,
)
if install_result.returncode != 0:
print(f"[{self.name}] npm install failed: {install_result.stderr}")
return False
print(f"[{self.name}] Dependencies installed")
except Exception as e:
print(f"[{self.name}] Failed to install dependencies: {e}")
return False
# Ensure session directory exists
self._session_path.mkdir(parents=True, exist_ok=True)
@ -452,10 +454,13 @@ class WhatsAppAdapter(BasePlatformAdapter):
return True
except Exception as e:
self._release_platform_lock()
logger.error("[%s] Failed to start bridge: %s", self.name, e, exc_info=True)
self._close_bridge_log()
return False
finally:
if not self._running:
if lock_acquired:
self._release_platform_lock()
self._close_bridge_log()
def _close_bridge_log(self) -> None:
"""Close the bridge log file handle if open."""

View file

@ -91,6 +91,29 @@ class TestSignalAdapterInit:
assert adapter._account_normalized == "+15551234567"
class TestSignalConnectCleanup:
"""Regression coverage for failed connect() cleanup."""
@pytest.mark.asyncio
async def test_releases_lock_and_closes_client_on_healthcheck_failure(self, monkeypatch):
adapter = _make_signal_adapter(monkeypatch)
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=MagicMock(status_code=503))
mock_client.aclose = AsyncMock()
with patch("gateway.platforms.signal.httpx.AsyncClient", return_value=mock_client), \
patch("gateway.status.acquire_scoped_lock", return_value=(True, None)), \
patch("gateway.status.release_scoped_lock") as mock_release:
result = await adapter.connect()
assert result is False
mock_client.aclose.assert_awaited_once()
mock_release.assert_called_once_with("signal-phone", "+15551234567")
assert adapter.client is None
assert adapter._platform_lock_identity is None
class TestSignalHelpers:
def test_redact_phone_long(self):
from gateway.platforms.helpers import redact_phone

View file

@ -150,6 +150,31 @@ class TestAppMentionHandler:
assert "/hermes" in registered_commands
class TestSlackConnectCleanup:
"""Regression coverage for failed connect() cleanup."""
@pytest.mark.asyncio
async def test_releases_platform_lock_when_auth_fails(self):
config = PlatformConfig(enabled=True, token="xoxb-fake")
adapter = SlackAdapter(config)
mock_app = MagicMock()
mock_web_client = AsyncMock()
mock_web_client.auth_test = AsyncMock(side_effect=RuntimeError("boom"))
with patch.object(_slack_mod, "AsyncApp", return_value=mock_app), \
patch.object(_slack_mod, "AsyncWebClient", return_value=mock_web_client), \
patch.object(_slack_mod, "AsyncSocketModeHandler", return_value=MagicMock()), \
patch.dict(os.environ, {"SLACK_APP_TOKEN": "xapp-fake"}), \
patch("gateway.status.acquire_scoped_lock", return_value=(True, None)), \
patch("gateway.status.release_scoped_lock") as mock_release:
result = await adapter.connect()
assert result is False
mock_release.assert_called_once_with("slack-app-token", "xapp-fake")
assert adapter._platform_lock_identity is None
# ---------------------------------------------------------------------------
# TestSendDocument
# ---------------------------------------------------------------------------

View file

@ -211,6 +211,30 @@ class TestFileHandleClosedOnError:
assert adapter._bridge_log_fh is None
class TestConnectCleanup:
"""Verify failure paths release the scoped session lock."""
@pytest.mark.asyncio
async def test_releases_lock_when_npm_install_fails(self):
adapter = _make_adapter()
def _path_exists(path_obj):
return not str(path_obj).endswith("node_modules")
install_result = MagicMock(returncode=1, stderr="install failed")
with patch("gateway.platforms.whatsapp.check_whatsapp_requirements", return_value=True), \
patch.object(Path, "exists", autospec=True, side_effect=_path_exists), \
patch("subprocess.run", return_value=install_result), \
patch("gateway.status.acquire_scoped_lock", return_value=(True, None)), \
patch("gateway.status.release_scoped_lock") as mock_release:
result = await adapter.connect()
assert result is False
mock_release.assert_called_once_with("whatsapp-session", str(adapter._session_path))
assert adapter._platform_lock_identity is None
class TestBridgeRuntimeFailure:
"""Verify runtime bridge death is surfaced as a fatal adapter error."""