From ccf526964a793371d4df97b4d1ae4f7d6fd63439 Mon Sep 17 00:00:00 2001 From: teknium1 <127238744+teknium1@users.noreply.github.com> Date: Sat, 27 Jun 2026 18:49:44 -0700 Subject: [PATCH] fix(gateway): bound adapter teardown awaits on the stop path (#14128) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The main stop loop in _stop_impl() awaited adapter.cancel_background_tasks() and adapter.disconnect() with no timeout, for both the primary and the secondary-profile (multiplex) adapter maps. A half-dead platform — a wedged Feishu/Lark WebSocket thread blocked on network I/O is the reported case — makes one of those awaits block forever, so the process never exits. systemd then SIGKILLs it after TimeoutStopSec, skipping atexit PID-file cleanup, and the next start dies with 'PID file race lost' and enters a restart loop. The per-adapter timeout infra already existed on main (_adapter_disconnect_timeout_secs / HERMES_GATEWAY_ADAPTER_DISCONNECT_TIMEOUT, default 5s) but was only wired into _safe_adapter_disconnect, which the teardown path never calls. Add _bounded_adapter_teardown(): wraps BOTH cancel_background_tasks() and disconnect() in the existing timeout budget, logs and forces forward progress on timeout, and never raises. Both teardown loops now route through it, so the stop sequence always completes regardless of any adapter's internal behavior and PID-file cleanup runs. Original report + fix direction by @happy5318 (#14128, #14130); this widens it to cover cancel_background_tasks(), the multiplex loop, and the config knob. Co-authored-by: happy5318 --- gateway/run.py | 86 +++++++---- .../gateway/test_bounded_adapter_teardown.py | 134 ++++++++++++++++++ 2 files changed, 192 insertions(+), 28 deletions(-) create mode 100644 tests/gateway/test_bounded_adapter_teardown.py diff --git a/gateway/run.py b/gateway/run.py index 8f5239ae8f3..c12c9f6cd64 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -3093,6 +3093,60 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew e, ) + async def _bounded_adapter_teardown( + self, adapter, platform, *, profile: Optional[str] = None + ) -> None: + """Tear down one adapter on the shutdown path with bounded awaits. + + Both ``cancel_background_tasks()`` and ``disconnect()`` can block + indefinitely when a platform's network state is half-dead (e.g. a + wedged Feishu/Lark WebSocket thread waiting on I/O). An unbounded + await here stalls the entire shutdown sequence past systemd's + ``TimeoutStopSec``; the resulting SIGKILL skips ``atexit`` PID-file + cleanup, so the next start dies with "PID file race lost" (#14128). + + Each await is wrapped in the existing per-adapter timeout budget + (``HERMES_GATEWAY_ADAPTER_DISCONNECT_TIMEOUT``). On timeout we log + and force forward progress; the loop never hangs regardless of any + adapter's internal behavior. Never raises. + """ + timeout = self._adapter_disconnect_timeout_secs() + suffix = f" (profile: {profile})" if profile else "" + started_at = time.monotonic() + try: + if timeout <= 0: + await adapter.cancel_background_tasks() + else: + await asyncio.wait_for( + adapter.cancel_background_tasks(), timeout=timeout + ) + except asyncio.TimeoutError: + logger.warning( + "✗ %s background-task cancel timed out after %.1fs - forcing continue%s", + platform.value, timeout, suffix, + ) + except Exception as e: + logger.debug("✗ %s background-task cancel error%s: %s", platform.value, suffix, e) + try: + if timeout <= 0: + await adapter.disconnect() + else: + await asyncio.wait_for(adapter.disconnect(), timeout=timeout) + logger.info( + "✓ %s disconnected (%.2fs)%s", + platform.value, time.monotonic() - started_at, suffix, + ) + except asyncio.TimeoutError: + logger.warning( + "✗ %s disconnect timed out after %.1fs - forcing continue%s", + platform.value, timeout, suffix, + ) + except Exception as e: + logger.error( + "✗ %s disconnect error after %.2fs%s: %s", + platform.value, time.monotonic() - started_at, suffix, e, + ) + def _adapter_disconnect_timeout_secs(self) -> float: """Return the per-adapter disconnect timeout used during shutdown.""" raw = os.getenv("HERMES_GATEWAY_ADAPTER_DISCONNECT_TIMEOUT", "").strip() @@ -7204,38 +7258,14 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew self._cleanup_agent_resources(_agent) for platform, adapter in list(self.adapters.items()): - _adapter_started_at = time.monotonic() - try: - await adapter.cancel_background_tasks() - except Exception as e: - logger.debug("✗ %s background-task cancel error: %s", platform.value, e) - try: - await adapter.disconnect() - logger.info( - "✓ %s disconnected (%.2fs)", - platform.value, - time.monotonic() - _adapter_started_at, - ) - except Exception as e: - logger.error( - "✗ %s disconnect error after %.2fs: %s", - platform.value, - time.monotonic() - _adapter_started_at, - e, - ) + await self._bounded_adapter_teardown(adapter, platform) # Disconnect secondary-profile adapters (multiplex mode). for _prof, _amap in list(getattr(self, "_profile_adapters", {}).items()): for platform, adapter in list(_amap.items()): - try: - await adapter.cancel_background_tasks() - except Exception as e: - logger.debug("✗ %s bg-cancel error (profile %s): %s", platform.value, _prof, e) - try: - await adapter.disconnect() - logger.info("✓ %s disconnected (profile: %s)", platform.value, _prof) - except Exception as e: - logger.error("✗ %s disconnect error (profile %s): %s", platform.value, _prof, e) + await self._bounded_adapter_teardown( + adapter, platform, profile=_prof + ) _amap.clear() if hasattr(self, "_profile_adapters"): self._profile_adapters.clear() diff --git a/tests/gateway/test_bounded_adapter_teardown.py b/tests/gateway/test_bounded_adapter_teardown.py new file mode 100644 index 00000000000..abe20608d41 --- /dev/null +++ b/tests/gateway/test_bounded_adapter_teardown.py @@ -0,0 +1,134 @@ +"""Regression tests: the shutdown teardown loop must not hang on a wedged adapter. + +`GatewayRunner._stop_impl()` tears down every adapter by awaiting +`cancel_background_tasks()` then `disconnect()`. Both calls can block +indefinitely when a platform's network state is half-dead (e.g. a wedged +Feishu/Lark WebSocket thread waiting on I/O). An unbounded await stalls the +whole shutdown past systemd's TimeoutStopSec; the resulting SIGKILL skips +atexit PID-file cleanup, so the next start dies with "PID file race lost" +(#14128). + +The fix routes both teardown loops through `_bounded_adapter_teardown`, +which wraps each await in the existing per-adapter timeout budget +(HERMES_GATEWAY_ADAPTER_DISCONNECT_TIMEOUT) and always returns. +""" + +import asyncio +import logging +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from gateway.config import Platform +from gateway.run import GatewayRunner + + +@pytest.fixture +def bare_runner(): + """A GatewayRunner shell that only needs _bounded_adapter_teardown.""" + return object.__new__(GatewayRunner) + + +@pytest.mark.asyncio +async def test_teardown_calls_both_methods(bare_runner): + """The helper cancels background tasks AND disconnects, in that order.""" + calls = [] + adapter = MagicMock() + adapter.cancel_background_tasks = AsyncMock( + side_effect=lambda: calls.append("cancel") + ) + adapter.disconnect = AsyncMock(side_effect=lambda: calls.append("disconnect")) + + await bare_runner._bounded_adapter_teardown(adapter, Platform.TELEGRAM) + + adapter.cancel_background_tasks.assert_awaited_once() + adapter.disconnect.assert_awaited_once() + assert calls == ["cancel", "disconnect"] + + +@pytest.mark.asyncio +async def test_teardown_bounds_hanging_disconnect(bare_runner, monkeypatch, caplog): + """A wedged disconnect() must time out instead of hanging the loop.""" + monkeypatch.setenv("HERMES_GATEWAY_ADAPTER_DISCONNECT_TIMEOUT", "0.01") + adapter = MagicMock() + adapter.cancel_background_tasks = AsyncMock(return_value=None) + + async def hang(): + await asyncio.sleep(60) + + adapter.disconnect = AsyncMock(side_effect=hang) + + with caplog.at_level(logging.WARNING, logger="gateway.run"): + await asyncio.wait_for( + bare_runner._bounded_adapter_teardown(adapter, Platform.FEISHU), + timeout=5.0, # the helper itself must return well under this + ) + + adapter.disconnect.assert_awaited_once() + assert "feishu disconnect timed out" in caplog.text + + +@pytest.mark.asyncio +async def test_teardown_bounds_hanging_cancel(bare_runner, monkeypatch, caplog): + """A wedged cancel_background_tasks() must time out, then disconnect runs.""" + monkeypatch.setenv("HERMES_GATEWAY_ADAPTER_DISCONNECT_TIMEOUT", "0.01") + adapter = MagicMock() + + async def hang(): + await asyncio.sleep(60) + + adapter.cancel_background_tasks = AsyncMock(side_effect=hang) + adapter.disconnect = AsyncMock(return_value=None) + + with caplog.at_level(logging.WARNING, logger="gateway.run"): + await asyncio.wait_for( + bare_runner._bounded_adapter_teardown(adapter, Platform.FEISHU), + timeout=5.0, + ) + + assert "feishu background-task cancel timed out" in caplog.text + # disconnect still attempted after the cancel timeout — forward progress. + adapter.disconnect.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_teardown_swallows_exceptions(bare_runner): + """Errors in either await must not propagate — shutdown continues.""" + adapter = MagicMock() + adapter.cancel_background_tasks = AsyncMock(side_effect=RuntimeError("bg")) + adapter.disconnect = AsyncMock(side_effect=RuntimeError("disc")) + + # Must NOT raise. + await bare_runner._bounded_adapter_teardown(adapter, Platform.TELEGRAM) + + adapter.cancel_background_tasks.assert_awaited_once() + adapter.disconnect.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_teardown_profile_suffix_in_logs(bare_runner, caplog): + """Multiplex (secondary-profile) teardown tags log lines with the profile.""" + adapter = MagicMock() + adapter.cancel_background_tasks = AsyncMock(return_value=None) + adapter.disconnect = AsyncMock(return_value=None) + + with caplog.at_level(logging.INFO, logger="gateway.run"): + await bare_runner._bounded_adapter_teardown( + adapter, Platform.TELEGRAM, profile="acct2" + ) + + assert "(profile: acct2)" in caplog.text + + +@pytest.mark.asyncio +async def test_teardown_timeout_zero_disables_bound(bare_runner, monkeypatch): + """timeout=0 disables the wait_for wrapper but still calls through.""" + monkeypatch.setenv("HERMES_GATEWAY_ADAPTER_DISCONNECT_TIMEOUT", "0") + adapter = MagicMock() + adapter.cancel_background_tasks = AsyncMock(return_value=None) + adapter.disconnect = AsyncMock(return_value=None) + + await bare_runner._bounded_adapter_teardown(adapter, Platform.TELEGRAM) + + adapter.cancel_background_tasks.assert_awaited_once() + adapter.disconnect.assert_awaited_once()