diff --git a/gateway/run.py b/gateway/run.py index 8f5239ae8f3..c12c9f6cd64 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -3093,6 +3093,60 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew e, ) + async def _bounded_adapter_teardown( + self, adapter, platform, *, profile: Optional[str] = None + ) -> None: + """Tear down one adapter on the shutdown path with bounded awaits. + + Both ``cancel_background_tasks()`` and ``disconnect()`` can block + indefinitely when a platform's network state is half-dead (e.g. a + wedged Feishu/Lark WebSocket thread waiting on I/O). An unbounded + await here stalls the entire shutdown sequence past systemd's + ``TimeoutStopSec``; the resulting SIGKILL skips ``atexit`` PID-file + cleanup, so the next start dies with "PID file race lost" (#14128). + + Each await is wrapped in the existing per-adapter timeout budget + (``HERMES_GATEWAY_ADAPTER_DISCONNECT_TIMEOUT``). On timeout we log + and force forward progress; the loop never hangs regardless of any + adapter's internal behavior. Never raises. + """ + timeout = self._adapter_disconnect_timeout_secs() + suffix = f" (profile: {profile})" if profile else "" + started_at = time.monotonic() + try: + if timeout <= 0: + await adapter.cancel_background_tasks() + else: + await asyncio.wait_for( + adapter.cancel_background_tasks(), timeout=timeout + ) + except asyncio.TimeoutError: + logger.warning( + "✗ %s background-task cancel timed out after %.1fs - forcing continue%s", + platform.value, timeout, suffix, + ) + except Exception as e: + logger.debug("✗ %s background-task cancel error%s: %s", platform.value, suffix, e) + try: + if timeout <= 0: + await adapter.disconnect() + else: + await asyncio.wait_for(adapter.disconnect(), timeout=timeout) + logger.info( + "✓ %s disconnected (%.2fs)%s", + platform.value, time.monotonic() - started_at, suffix, + ) + except asyncio.TimeoutError: + logger.warning( + "✗ %s disconnect timed out after %.1fs - forcing continue%s", + platform.value, timeout, suffix, + ) + except Exception as e: + logger.error( + "✗ %s disconnect error after %.2fs%s: %s", + platform.value, time.monotonic() - started_at, suffix, e, + ) + def _adapter_disconnect_timeout_secs(self) -> float: """Return the per-adapter disconnect timeout used during shutdown.""" raw = os.getenv("HERMES_GATEWAY_ADAPTER_DISCONNECT_TIMEOUT", "").strip() @@ -7204,38 +7258,14 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew self._cleanup_agent_resources(_agent) for platform, adapter in list(self.adapters.items()): - _adapter_started_at = time.monotonic() - try: - await adapter.cancel_background_tasks() - except Exception as e: - logger.debug("✗ %s background-task cancel error: %s", platform.value, e) - try: - await adapter.disconnect() - logger.info( - "✓ %s disconnected (%.2fs)", - platform.value, - time.monotonic() - _adapter_started_at, - ) - except Exception as e: - logger.error( - "✗ %s disconnect error after %.2fs: %s", - platform.value, - time.monotonic() - _adapter_started_at, - e, - ) + await self._bounded_adapter_teardown(adapter, platform) # Disconnect secondary-profile adapters (multiplex mode). for _prof, _amap in list(getattr(self, "_profile_adapters", {}).items()): for platform, adapter in list(_amap.items()): - try: - await adapter.cancel_background_tasks() - except Exception as e: - logger.debug("✗ %s bg-cancel error (profile %s): %s", platform.value, _prof, e) - try: - await adapter.disconnect() - logger.info("✓ %s disconnected (profile: %s)", platform.value, _prof) - except Exception as e: - logger.error("✗ %s disconnect error (profile %s): %s", platform.value, _prof, e) + await self._bounded_adapter_teardown( + adapter, platform, profile=_prof + ) _amap.clear() if hasattr(self, "_profile_adapters"): self._profile_adapters.clear() diff --git a/tests/gateway/test_bounded_adapter_teardown.py b/tests/gateway/test_bounded_adapter_teardown.py new file mode 100644 index 00000000000..abe20608d41 --- /dev/null +++ b/tests/gateway/test_bounded_adapter_teardown.py @@ -0,0 +1,134 @@ +"""Regression tests: the shutdown teardown loop must not hang on a wedged adapter. + +`GatewayRunner._stop_impl()` tears down every adapter by awaiting +`cancel_background_tasks()` then `disconnect()`. Both calls can block +indefinitely when a platform's network state is half-dead (e.g. a wedged +Feishu/Lark WebSocket thread waiting on I/O). An unbounded await stalls the +whole shutdown past systemd's TimeoutStopSec; the resulting SIGKILL skips +atexit PID-file cleanup, so the next start dies with "PID file race lost" +(#14128). + +The fix routes both teardown loops through `_bounded_adapter_teardown`, +which wraps each await in the existing per-adapter timeout budget +(HERMES_GATEWAY_ADAPTER_DISCONNECT_TIMEOUT) and always returns. +""" + +import asyncio +import logging +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from gateway.config import Platform +from gateway.run import GatewayRunner + + +@pytest.fixture +def bare_runner(): + """A GatewayRunner shell that only needs _bounded_adapter_teardown.""" + return object.__new__(GatewayRunner) + + +@pytest.mark.asyncio +async def test_teardown_calls_both_methods(bare_runner): + """The helper cancels background tasks AND disconnects, in that order.""" + calls = [] + adapter = MagicMock() + adapter.cancel_background_tasks = AsyncMock( + side_effect=lambda: calls.append("cancel") + ) + adapter.disconnect = AsyncMock(side_effect=lambda: calls.append("disconnect")) + + await bare_runner._bounded_adapter_teardown(adapter, Platform.TELEGRAM) + + adapter.cancel_background_tasks.assert_awaited_once() + adapter.disconnect.assert_awaited_once() + assert calls == ["cancel", "disconnect"] + + +@pytest.mark.asyncio +async def test_teardown_bounds_hanging_disconnect(bare_runner, monkeypatch, caplog): + """A wedged disconnect() must time out instead of hanging the loop.""" + monkeypatch.setenv("HERMES_GATEWAY_ADAPTER_DISCONNECT_TIMEOUT", "0.01") + adapter = MagicMock() + adapter.cancel_background_tasks = AsyncMock(return_value=None) + + async def hang(): + await asyncio.sleep(60) + + adapter.disconnect = AsyncMock(side_effect=hang) + + with caplog.at_level(logging.WARNING, logger="gateway.run"): + await asyncio.wait_for( + bare_runner._bounded_adapter_teardown(adapter, Platform.FEISHU), + timeout=5.0, # the helper itself must return well under this + ) + + adapter.disconnect.assert_awaited_once() + assert "feishu disconnect timed out" in caplog.text + + +@pytest.mark.asyncio +async def test_teardown_bounds_hanging_cancel(bare_runner, monkeypatch, caplog): + """A wedged cancel_background_tasks() must time out, then disconnect runs.""" + monkeypatch.setenv("HERMES_GATEWAY_ADAPTER_DISCONNECT_TIMEOUT", "0.01") + adapter = MagicMock() + + async def hang(): + await asyncio.sleep(60) + + adapter.cancel_background_tasks = AsyncMock(side_effect=hang) + adapter.disconnect = AsyncMock(return_value=None) + + with caplog.at_level(logging.WARNING, logger="gateway.run"): + await asyncio.wait_for( + bare_runner._bounded_adapter_teardown(adapter, Platform.FEISHU), + timeout=5.0, + ) + + assert "feishu background-task cancel timed out" in caplog.text + # disconnect still attempted after the cancel timeout — forward progress. + adapter.disconnect.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_teardown_swallows_exceptions(bare_runner): + """Errors in either await must not propagate — shutdown continues.""" + adapter = MagicMock() + adapter.cancel_background_tasks = AsyncMock(side_effect=RuntimeError("bg")) + adapter.disconnect = AsyncMock(side_effect=RuntimeError("disc")) + + # Must NOT raise. + await bare_runner._bounded_adapter_teardown(adapter, Platform.TELEGRAM) + + adapter.cancel_background_tasks.assert_awaited_once() + adapter.disconnect.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_teardown_profile_suffix_in_logs(bare_runner, caplog): + """Multiplex (secondary-profile) teardown tags log lines with the profile.""" + adapter = MagicMock() + adapter.cancel_background_tasks = AsyncMock(return_value=None) + adapter.disconnect = AsyncMock(return_value=None) + + with caplog.at_level(logging.INFO, logger="gateway.run"): + await bare_runner._bounded_adapter_teardown( + adapter, Platform.TELEGRAM, profile="acct2" + ) + + assert "(profile: acct2)" in caplog.text + + +@pytest.mark.asyncio +async def test_teardown_timeout_zero_disables_bound(bare_runner, monkeypatch): + """timeout=0 disables the wait_for wrapper but still calls through.""" + monkeypatch.setenv("HERMES_GATEWAY_ADAPTER_DISCONNECT_TIMEOUT", "0") + adapter = MagicMock() + adapter.cancel_background_tasks = AsyncMock(return_value=None) + adapter.disconnect = AsyncMock(return_value=None) + + await bare_runner._bounded_adapter_teardown(adapter, Platform.TELEGRAM) + + adapter.cancel_background_tasks.assert_awaited_once() + adapter.disconnect.assert_awaited_once()