mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-12 03:42:08 +00:00
fix(gateway): cap adapter disconnect during stop
This commit is contained in:
parent
524cbabd89
commit
dccf1fb6e0
4 changed files with 79 additions and 2 deletions
|
|
@ -61,6 +61,7 @@ from hermes_cli.config import cfg_get
|
||||||
_AGENT_CACHE_MAX_SIZE = 128
|
_AGENT_CACHE_MAX_SIZE = 128
|
||||||
_AGENT_CACHE_IDLE_TTL_SECS = 3600.0 # evict agents idle for >1h
|
_AGENT_CACHE_IDLE_TTL_SECS = 3600.0 # evict agents idle for >1h
|
||||||
_PLATFORM_CONNECT_TIMEOUT_SECS_DEFAULT = 30.0
|
_PLATFORM_CONNECT_TIMEOUT_SECS_DEFAULT = 30.0
|
||||||
|
_ADAPTER_DISCONNECT_TIMEOUT_SECS_DEFAULT = 5.0
|
||||||
_TELEGRAM_COMMAND_MENTION_RE = re.compile(r"(?<![\w:/])/([A-Za-z0-9][A-Za-z0-9_-]*)")
|
_TELEGRAM_COMMAND_MENTION_RE = re.compile(r"(?<![\w:/])/([A-Za-z0-9][A-Za-z0-9_-]*)")
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -1494,8 +1495,18 @@ class GatewayRunner:
|
||||||
Must tolerate partial-init state and never raise, since callers
|
Must tolerate partial-init state and never raise, since callers
|
||||||
use it inside error-handling blocks.
|
use it inside error-handling blocks.
|
||||||
"""
|
"""
|
||||||
|
timeout = self._adapter_disconnect_timeout_secs()
|
||||||
try:
|
try:
|
||||||
await adapter.disconnect()
|
if timeout <= 0:
|
||||||
|
await adapter.disconnect()
|
||||||
|
else:
|
||||||
|
await asyncio.wait_for(adapter.disconnect(), timeout=timeout)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
logger.warning(
|
||||||
|
"Timed out after %.1fs while disconnecting %s adapter; continuing shutdown",
|
||||||
|
timeout,
|
||||||
|
platform.value if platform is not None else "adapter",
|
||||||
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Defensive %s disconnect after failed connect raised: %s",
|
"Defensive %s disconnect after failed connect raised: %s",
|
||||||
|
|
@ -1503,6 +1514,21 @@ class GatewayRunner:
|
||||||
e,
|
e,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _adapter_disconnect_timeout_secs(self) -> float:
|
||||||
|
"""Return the per-adapter disconnect timeout used during shutdown."""
|
||||||
|
raw = os.getenv("HERMES_GATEWAY_ADAPTER_DISCONNECT_TIMEOUT", "").strip()
|
||||||
|
if raw:
|
||||||
|
try:
|
||||||
|
timeout = float(raw)
|
||||||
|
except ValueError:
|
||||||
|
logger.warning(
|
||||||
|
"Ignoring invalid HERMES_GATEWAY_ADAPTER_DISCONNECT_TIMEOUT=%r",
|
||||||
|
raw,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
return max(0.0, timeout)
|
||||||
|
return _ADAPTER_DISCONNECT_TIMEOUT_SECS_DEFAULT
|
||||||
|
|
||||||
def _platform_connect_timeout_secs(self) -> float:
|
def _platform_connect_timeout_secs(self) -> float:
|
||||||
"""Return the per-platform connect timeout used during startup/retry."""
|
"""Return the per-platform connect timeout used during startup/retry."""
|
||||||
raw = os.getenv("HERMES_GATEWAY_PLATFORM_CONNECT_TIMEOUT", "").strip()
|
raw = os.getenv("HERMES_GATEWAY_PLATFORM_CONNECT_TIMEOUT", "").strip()
|
||||||
|
|
|
||||||
|
|
@ -2387,7 +2387,15 @@ def systemd_stop(system: bool = False):
|
||||||
write_planned_stop_marker(pid)
|
write_planned_stop_marker(pid)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
_run_systemctl(["stop", get_service_name()], system=system, check=True, timeout=90)
|
try:
|
||||||
|
_run_systemctl(["stop", get_service_name()], system=system, check=True, timeout=90)
|
||||||
|
except subprocess.TimeoutExpired:
|
||||||
|
label = _service_scope_label(system)
|
||||||
|
print(
|
||||||
|
f"Gateway {label} service is still stopping after 90s; "
|
||||||
|
"check `hermes gateway status` or logs for final shutdown state."
|
||||||
|
)
|
||||||
|
return
|
||||||
print(f"✓ {_service_scope_label(system).capitalize()} service stopped")
|
print(f"✓ {_service_scope_label(system).capitalize()} service stopped")
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,8 @@ The fix: gateway/run.py wraps each adapter connect() with a safety-net
|
||||||
call to _safe_adapter_disconnect() in the failure branches.
|
call to _safe_adapter_disconnect() in the failure branches.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
from unittest.mock import AsyncMock, MagicMock
|
from unittest.mock import AsyncMock, MagicMock
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
@ -57,3 +59,21 @@ async def test_safe_disconnect_handles_none_platform(bare_runner):
|
||||||
await bare_runner._safe_adapter_disconnect(adapter, None)
|
await bare_runner._safe_adapter_disconnect(adapter, None)
|
||||||
|
|
||||||
adapter.disconnect.assert_awaited_once()
|
adapter.disconnect.assert_awaited_once()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_safe_disconnect_times_out_and_continues(bare_runner, monkeypatch, caplog):
|
||||||
|
"""A wedged adapter disconnect must not block gateway shutdown."""
|
||||||
|
monkeypatch.setenv("HERMES_GATEWAY_ADAPTER_DISCONNECT_TIMEOUT", "0.001")
|
||||||
|
adapter = MagicMock()
|
||||||
|
|
||||||
|
async def hang():
|
||||||
|
await asyncio.sleep(60)
|
||||||
|
|
||||||
|
adapter.disconnect = AsyncMock(side_effect=hang)
|
||||||
|
|
||||||
|
with caplog.at_level(logging.WARNING, logger="gateway.run"):
|
||||||
|
await bare_runner._safe_adapter_disconnect(adapter, Platform.FEISHU)
|
||||||
|
|
||||||
|
adapter.disconnect.assert_awaited_once()
|
||||||
|
assert "Timed out after 0.0s while disconnecting feishu adapter" in caplog.text
|
||||||
|
|
|
||||||
|
|
@ -140,6 +140,29 @@ class TestSystemdServiceRefresh:
|
||||||
assert markers == [321]
|
assert markers == [321]
|
||||||
assert calls == [["stop", gateway_cli.get_service_name()]]
|
assert calls == [["stop", gateway_cli.get_service_name()]]
|
||||||
|
|
||||||
|
def test_systemd_stop_timeout_prints_status_guidance(self, monkeypatch, capsys):
|
||||||
|
markers = []
|
||||||
|
|
||||||
|
monkeypatch.setattr(gateway_cli, "_select_systemd_scope", lambda system=False: False)
|
||||||
|
monkeypatch.setattr(gateway_cli, "_require_service_installed", lambda action, system=False: None)
|
||||||
|
monkeypatch.setattr(status, "get_running_pid", lambda cleanup_stale=True: 321)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
status,
|
||||||
|
"write_planned_stop_marker",
|
||||||
|
lambda pid: markers.append(pid) or True,
|
||||||
|
)
|
||||||
|
|
||||||
|
def fake_run_systemctl(args, **kwargs):
|
||||||
|
raise subprocess.TimeoutExpired(args, kwargs.get("timeout"))
|
||||||
|
|
||||||
|
monkeypatch.setattr(gateway_cli, "_run_systemctl", fake_run_systemctl)
|
||||||
|
|
||||||
|
gateway_cli.systemd_stop()
|
||||||
|
|
||||||
|
assert markers == [321]
|
||||||
|
output = capsys.readouterr().out
|
||||||
|
assert "still stopping after 90s" in output
|
||||||
|
assert "hermes gateway status" in output
|
||||||
|
|
||||||
def test_run_gateway_refreshes_outdated_unit_on_boot(self, tmp_path, monkeypatch):
|
def test_run_gateway_refreshes_outdated_unit_on_boot(self, tmp_path, monkeypatch):
|
||||||
"""run_gateway() should refresh the systemd unit on boot so that
|
"""run_gateway() should refresh the systemd unit on boot so that
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue