fix(simplex): avoid reconnecting healthy idle websocket

Do not treat lack of application-level SimpleX events as a stale WebSocket. The websockets client already uses protocol ping/pong for connection liveness, so quiet but healthy connections should not be closed by the health monitor.
This commit is contained in:
maxcz79 2026-05-16 22:54:28 +02:00 committed by Teknium
parent e946f49ab5
commit 32032e1e2d
2 changed files with 73 additions and 30 deletions

View file

@ -269,7 +269,13 @@ class SimplexAdapter(BasePlatformAdapter):
# ------------------------------------------------------------------
async def _health_monitor(self) -> None:
"""Force reconnect if the WebSocket has been idle too long."""
"""Observe WebSocket idleness without reconnecting healthy quiet links.
simplex-chat can legitimately stay application-silent for long periods
when no messages arrive. The websockets client already sends protocol
pings (see _ws_listener ping_interval/ping_timeout), so treating lack of
chat events as a stale connection causes needless reconnect churn.
"""
while self._running:
await asyncio.sleep(HEALTH_CHECK_INTERVAL)
if not self._running:
@ -277,15 +283,7 @@ class SimplexAdapter(BasePlatformAdapter):
elapsed = time.time() - self._last_ws_activity
if elapsed > HEALTH_CHECK_STALE_THRESHOLD:
logger.warning(
"SimpleX: WS idle for %.0fs, forcing reconnect", elapsed
)
self._last_ws_activity = time.time()
if self._ws:
try:
await self._ws.close()
except Exception:
pass
logger.debug("SimpleX: WS application-idle for %.0fs", elapsed)
# ------------------------------------------------------------------
# Inbound event handling
@ -293,7 +291,12 @@ class SimplexAdapter(BasePlatformAdapter):
async def _handle_event(self, event: dict) -> None:
"""Dispatch a daemon event to the appropriate handler."""
resp_type = event.get("type") or event.get("resp", {}).get("type", "")
# simplex-chat WebSocket messages are usually shaped as:
# {"corrId": "...", "resp": {"type": "newChatItems", ...}}
# Older/examples may put the response fields at top-level. Normalize
# both forms before dispatching, otherwise inbound chatItems are lost.
resp = event.get("resp") if isinstance(event.get("resp"), dict) else event
resp_type = event.get("type") or resp.get("type", "")
# Filter responses to our own commands (echoes)
corr_id = event.get("corrId", "")
@ -302,10 +305,10 @@ class SimplexAdapter(BasePlatformAdapter):
return
if resp_type == "newChatItem":
await self._handle_new_chat_item(event)
await self._handle_new_chat_item(resp)
elif resp_type == "newChatItems":
# Batch variant — process each item
items = event.get("chatItems") or []
items = resp.get("chatItems") or []
for item_wrapper in items:
await self._handle_new_chat_item(item_wrapper)
# Ignore all other event types (delivery receipts, contact updates, etc.)
@ -347,7 +350,9 @@ class SimplexAdapter(BasePlatformAdapter):
or contact_info.get("localDisplayName")
or contact_id
)
chat_id = contact_id
# Replies must be routed by SimpleX CLI display name, while
# authorization should still use the stable numeric contactId.
chat_id = contact_name or contact_id
chat_name = contact_name
if not chat_id:
@ -364,7 +369,7 @@ class SimplexAdapter(BasePlatformAdapter):
or sender_id
)
else:
sender_id = chat_id
sender_id = contact_id if not is_group else chat_id
sender_name = chat_name
# Extract text
@ -508,7 +513,11 @@ class SimplexAdapter(BasePlatformAdapter):
group_id = chat_id[6:]
cmd_str = f"#[{group_id}] {content}"
else:
cmd_str = f"@[{chat_id}] {content}"
# SimpleX CLI addresses direct contacts by display name, e.g.
# `@Alice hello`. `@[Alice]` is interpreted literally as a contact
# named "[Alice]" and `@[4]` as "[4]", so do not wrap direct
# chat IDs / display names in brackets.
cmd_str = f"@{chat_id} {content}"
payload = {
"corrId": corr_id,
@ -643,7 +652,8 @@ async def _standalone_send(
group_id = chat_id[6:]
cmd_str = f"#[{group_id}] {message}"
else:
cmd_str = f"@[{chat_id}] {message}"
# Direct contacts are addressed by display name without brackets.
cmd_str = f"@{chat_id} {message}"
payload = {
"corrId": f"hermes-snd-{int(time.time() * 1000)}",

View file

@ -7,6 +7,7 @@ sibling platform-plugin tests on the same xdist worker.
from __future__ import annotations
import asyncio
import json
from unittest.mock import AsyncMock, MagicMock
@ -214,7 +215,7 @@ async def test_send_dm():
result = await adapter.send("contact-42", "Hello, SimpleX!")
mock_ws.send.assert_called_once()
payload = json.loads(mock_ws.send.call_args[0][0])
assert payload["cmd"] == "@[contact-42] Hello, SimpleX!"
assert payload["cmd"] == "@contact-42 Hello, SimpleX!"
assert payload["corrId"].startswith(_CORR_PREFIX)
assert result.success is True
@ -301,23 +302,55 @@ async def test_standalone_send_missing_websockets(monkeypatch):
@pytest.mark.asyncio
async def test_standalone_send_missing_url(monkeypatch):
async def test_standalone_send_defaults_to_local_daemon(monkeypatch):
monkeypatch.delenv("SIMPLEX_WS_URL", raising=False)
pconfig = MagicMock()
pconfig.extra = {}
# We expect the URL fallback (extra+env both empty) to be empty string,
# producing an error. We also need websockets to be importable for the
# url-check branch to be reached, so skip when it's not.
try:
import websockets.client # noqa: F401
except ImportError:
pytest.skip("websockets not installed")
sent_payloads = []
class DummyWs:
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return None
async def send(self, payload):
sent_payloads.append(json.loads(payload))
def fake_connect(url, **kwargs):
assert url == "ws://127.0.0.1:5225"
assert kwargs["open_timeout"] == 10
assert kwargs["close_timeout"] == 5
return DummyWs()
import websockets
monkeypatch.setattr(websockets, "connect", fake_connect)
result = await _standalone_send(pconfig, "contact-42", "hi")
assert isinstance(result, dict)
# Either error about URL or a connection attempt failure — both are valid
# signals that the standalone path requires configuration.
assert "error" in result
assert result == {"success": True, "platform": "simplex", "chat_id": "contact-42"}
assert sent_payloads[0]["cmd"] == "@contact-42 hi"
@pytest.mark.asyncio
async def test_health_monitor_does_not_reconnect_quiet_healthy_ws(monkeypatch):
from gateway.config import PlatformConfig
cfg = PlatformConfig(enabled=True, extra={"ws_url": "ws://localhost:5225"})
adapter = SimplexAdapter(cfg)
adapter._running = True
adapter._last_ws_activity = 0
adapter._ws = AsyncMock()
monkeypatch.setattr(_simplex, "HEALTH_CHECK_INTERVAL", 0.01)
monkeypatch.setattr(_simplex, "HEALTH_CHECK_STALE_THRESHOLD", 0.01)
task = asyncio.create_task(adapter._health_monitor())
await asyncio.sleep(0.03)
adapter._running = False
await asyncio.wait_for(task, timeout=1)
adapter._ws.close.assert_not_called()
# ---------------------------------------------------------------------------