fix(gateway): bound adapter teardown awaits on the stop path (#14128)

The main stop loop in _stop_impl() awaited adapter.cancel_background_tasks()
and adapter.disconnect() with no timeout, for both the primary and the
secondary-profile (multiplex) adapter maps. A half-dead platform — a wedged
Feishu/Lark WebSocket thread blocked on network I/O is the reported case —
makes one of those awaits block forever, so the process never exits. systemd
then SIGKILLs it after TimeoutStopSec, skipping atexit PID-file cleanup, and
the next start dies with 'PID file race lost' and enters a restart loop.

The per-adapter timeout infra already existed on main
(_adapter_disconnect_timeout_secs / HERMES_GATEWAY_ADAPTER_DISCONNECT_TIMEOUT,
default 5s) but was only wired into _safe_adapter_disconnect, which the
teardown path never calls.

Add _bounded_adapter_teardown(): wraps BOTH cancel_background_tasks() and
disconnect() in the existing timeout budget, logs and forces forward progress
on timeout, and never raises. Both teardown loops now route through it, so the
stop sequence always completes regardless of any adapter's internal behavior
and PID-file cleanup runs.

Original report + fix direction by @happy5318 (#14128, #14130); this widens it
to cover cancel_background_tasks(), the multiplex loop, and the config knob.

Co-authored-by: happy5318 <happy5318@users.noreply.github.com>
This commit is contained in:
teknium1 2026-06-27 18:49:44 -07:00 committed by Teknium
parent 6717cfc805
commit ccf526964a
2 changed files with 192 additions and 28 deletions

View file

@ -3093,6 +3093,60 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
e,
)
async def _bounded_adapter_teardown(
self, adapter, platform, *, profile: Optional[str] = None
) -> None:
"""Tear down one adapter on the shutdown path with bounded awaits.
Both ``cancel_background_tasks()`` and ``disconnect()`` can block
indefinitely when a platform's network state is half-dead (e.g. a
wedged Feishu/Lark WebSocket thread waiting on I/O). An unbounded
await here stalls the entire shutdown sequence past systemd's
``TimeoutStopSec``; the resulting SIGKILL skips ``atexit`` PID-file
cleanup, so the next start dies with "PID file race lost" (#14128).
Each await is wrapped in the existing per-adapter timeout budget
(``HERMES_GATEWAY_ADAPTER_DISCONNECT_TIMEOUT``). On timeout we log
and force forward progress; the loop never hangs regardless of any
adapter's internal behavior. Never raises.
"""
timeout = self._adapter_disconnect_timeout_secs()
suffix = f" (profile: {profile})" if profile else ""
started_at = time.monotonic()
try:
if timeout <= 0:
await adapter.cancel_background_tasks()
else:
await asyncio.wait_for(
adapter.cancel_background_tasks(), timeout=timeout
)
except asyncio.TimeoutError:
logger.warning(
"%s background-task cancel timed out after %.1fs - forcing continue%s",
platform.value, timeout, suffix,
)
except Exception as e:
logger.debug("%s background-task cancel error%s: %s", platform.value, suffix, e)
try:
if timeout <= 0:
await adapter.disconnect()
else:
await asyncio.wait_for(adapter.disconnect(), timeout=timeout)
logger.info(
"%s disconnected (%.2fs)%s",
platform.value, time.monotonic() - started_at, suffix,
)
except asyncio.TimeoutError:
logger.warning(
"%s disconnect timed out after %.1fs - forcing continue%s",
platform.value, timeout, suffix,
)
except Exception as e:
logger.error(
"%s disconnect error after %.2fs%s: %s",
platform.value, time.monotonic() - started_at, suffix, e,
)
def _adapter_disconnect_timeout_secs(self) -> float:
"""Return the per-adapter disconnect timeout used during shutdown."""
raw = os.getenv("HERMES_GATEWAY_ADAPTER_DISCONNECT_TIMEOUT", "").strip()
@ -7204,38 +7258,14 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
self._cleanup_agent_resources(_agent)
for platform, adapter in list(self.adapters.items()):
_adapter_started_at = time.monotonic()
try:
await adapter.cancel_background_tasks()
except Exception as e:
logger.debug("%s background-task cancel error: %s", platform.value, e)
try:
await adapter.disconnect()
logger.info(
"%s disconnected (%.2fs)",
platform.value,
time.monotonic() - _adapter_started_at,
)
except Exception as e:
logger.error(
"%s disconnect error after %.2fs: %s",
platform.value,
time.monotonic() - _adapter_started_at,
e,
)
await self._bounded_adapter_teardown(adapter, platform)
# Disconnect secondary-profile adapters (multiplex mode).
for _prof, _amap in list(getattr(self, "_profile_adapters", {}).items()):
for platform, adapter in list(_amap.items()):
try:
await adapter.cancel_background_tasks()
except Exception as e:
logger.debug("%s bg-cancel error (profile %s): %s", platform.value, _prof, e)
try:
await adapter.disconnect()
logger.info("%s disconnected (profile: %s)", platform.value, _prof)
except Exception as e:
logger.error("%s disconnect error (profile %s): %s", platform.value, _prof, e)
await self._bounded_adapter_teardown(
adapter, platform, profile=_prof
)
_amap.clear()
if hasattr(self, "_profile_adapters"):
self._profile_adapters.clear()

View file

@ -0,0 +1,134 @@
"""Regression tests: the shutdown teardown loop must not hang on a wedged adapter.
`GatewayRunner._stop_impl()` tears down every adapter by awaiting
`cancel_background_tasks()` then `disconnect()`. Both calls can block
indefinitely when a platform's network state is half-dead (e.g. a wedged
Feishu/Lark WebSocket thread waiting on I/O). An unbounded await stalls the
whole shutdown past systemd's TimeoutStopSec; the resulting SIGKILL skips
atexit PID-file cleanup, so the next start dies with "PID file race lost"
(#14128).
The fix routes both teardown loops through `_bounded_adapter_teardown`,
which wraps each await in the existing per-adapter timeout budget
(HERMES_GATEWAY_ADAPTER_DISCONNECT_TIMEOUT) and always returns.
"""
import asyncio
import logging
from unittest.mock import AsyncMock, MagicMock
import pytest
from gateway.config import Platform
from gateway.run import GatewayRunner
@pytest.fixture
def bare_runner():
"""A GatewayRunner shell that only needs _bounded_adapter_teardown."""
return object.__new__(GatewayRunner)
@pytest.mark.asyncio
async def test_teardown_calls_both_methods(bare_runner):
"""The helper cancels background tasks AND disconnects, in that order."""
calls = []
adapter = MagicMock()
adapter.cancel_background_tasks = AsyncMock(
side_effect=lambda: calls.append("cancel")
)
adapter.disconnect = AsyncMock(side_effect=lambda: calls.append("disconnect"))
await bare_runner._bounded_adapter_teardown(adapter, Platform.TELEGRAM)
adapter.cancel_background_tasks.assert_awaited_once()
adapter.disconnect.assert_awaited_once()
assert calls == ["cancel", "disconnect"]
@pytest.mark.asyncio
async def test_teardown_bounds_hanging_disconnect(bare_runner, monkeypatch, caplog):
"""A wedged disconnect() must time out instead of hanging the loop."""
monkeypatch.setenv("HERMES_GATEWAY_ADAPTER_DISCONNECT_TIMEOUT", "0.01")
adapter = MagicMock()
adapter.cancel_background_tasks = AsyncMock(return_value=None)
async def hang():
await asyncio.sleep(60)
adapter.disconnect = AsyncMock(side_effect=hang)
with caplog.at_level(logging.WARNING, logger="gateway.run"):
await asyncio.wait_for(
bare_runner._bounded_adapter_teardown(adapter, Platform.FEISHU),
timeout=5.0, # the helper itself must return well under this
)
adapter.disconnect.assert_awaited_once()
assert "feishu disconnect timed out" in caplog.text
@pytest.mark.asyncio
async def test_teardown_bounds_hanging_cancel(bare_runner, monkeypatch, caplog):
"""A wedged cancel_background_tasks() must time out, then disconnect runs."""
monkeypatch.setenv("HERMES_GATEWAY_ADAPTER_DISCONNECT_TIMEOUT", "0.01")
adapter = MagicMock()
async def hang():
await asyncio.sleep(60)
adapter.cancel_background_tasks = AsyncMock(side_effect=hang)
adapter.disconnect = AsyncMock(return_value=None)
with caplog.at_level(logging.WARNING, logger="gateway.run"):
await asyncio.wait_for(
bare_runner._bounded_adapter_teardown(adapter, Platform.FEISHU),
timeout=5.0,
)
assert "feishu background-task cancel timed out" in caplog.text
# disconnect still attempted after the cancel timeout — forward progress.
adapter.disconnect.assert_awaited_once()
@pytest.mark.asyncio
async def test_teardown_swallows_exceptions(bare_runner):
"""Errors in either await must not propagate — shutdown continues."""
adapter = MagicMock()
adapter.cancel_background_tasks = AsyncMock(side_effect=RuntimeError("bg"))
adapter.disconnect = AsyncMock(side_effect=RuntimeError("disc"))
# Must NOT raise.
await bare_runner._bounded_adapter_teardown(adapter, Platform.TELEGRAM)
adapter.cancel_background_tasks.assert_awaited_once()
adapter.disconnect.assert_awaited_once()
@pytest.mark.asyncio
async def test_teardown_profile_suffix_in_logs(bare_runner, caplog):
"""Multiplex (secondary-profile) teardown tags log lines with the profile."""
adapter = MagicMock()
adapter.cancel_background_tasks = AsyncMock(return_value=None)
adapter.disconnect = AsyncMock(return_value=None)
with caplog.at_level(logging.INFO, logger="gateway.run"):
await bare_runner._bounded_adapter_teardown(
adapter, Platform.TELEGRAM, profile="acct2"
)
assert "(profile: acct2)" in caplog.text
@pytest.mark.asyncio
async def test_teardown_timeout_zero_disables_bound(bare_runner, monkeypatch):
"""timeout=0 disables the wait_for wrapper but still calls through."""
monkeypatch.setenv("HERMES_GATEWAY_ADAPTER_DISCONNECT_TIMEOUT", "0")
adapter = MagicMock()
adapter.cancel_background_tasks = AsyncMock(return_value=None)
adapter.disconnect = AsyncMock(return_value=None)
await bare_runner._bounded_adapter_teardown(adapter, Platform.TELEGRAM)
adapter.cancel_background_tasks.assert_awaited_once()
adapter.disconnect.assert_awaited_once()