mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-13 09:01:54 +00:00
fix(matrix): reconcile pending invites from sync state
This commit is contained in:
parent
6649e7e746
commit
57f8cf00e9
2 changed files with 58 additions and 0 deletions
|
|
@ -667,6 +667,7 @@ class MatrixAdapter(BasePlatformAdapter):
|
|||
await asyncio.gather(*tasks)
|
||||
except Exception as exc:
|
||||
logger.warning("Matrix: initial sync event dispatch error: %s", exc)
|
||||
await self._join_pending_invites(sync_data)
|
||||
else:
|
||||
logger.warning(
|
||||
"Matrix: initial sync returned unexpected type %s",
|
||||
|
|
@ -1143,6 +1144,7 @@ class MatrixAdapter(BasePlatformAdapter):
|
|||
await asyncio.gather(*tasks)
|
||||
except Exception as exc:
|
||||
logger.warning("Matrix: sync event dispatch error: %s", exc)
|
||||
await self._join_pending_invites(sync_data)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
|
|
@ -1623,13 +1625,35 @@ class MatrixAdapter(BasePlatformAdapter):
|
|||
"Matrix: invited to %s — joining",
|
||||
room_id,
|
||||
)
|
||||
await self._join_room_by_id(room_id)
|
||||
|
||||
async def _join_room_by_id(self, room_id: str) -> bool:
|
||||
"""Join a room by ID and refresh local caches on success."""
|
||||
if not room_id:
|
||||
return False
|
||||
if room_id in self._joined_rooms:
|
||||
return True
|
||||
try:
|
||||
await self._client.join_room(RoomID(room_id))
|
||||
self._joined_rooms.add(room_id)
|
||||
logger.info("Matrix: joined %s", room_id)
|
||||
await self._refresh_dm_cache()
|
||||
return True
|
||||
except Exception as exc:
|
||||
logger.warning("Matrix: error joining %s: %s", room_id, exc)
|
||||
return False
|
||||
|
||||
async def _join_pending_invites(self, sync_data: Dict[str, Any]) -> None:
|
||||
"""Join rooms still present in rooms.invite after sync processing."""
|
||||
rooms = sync_data.get("rooms", {}) if isinstance(sync_data, dict) else {}
|
||||
invites = rooms.get("invite", {})
|
||||
if not isinstance(invites, dict):
|
||||
return
|
||||
for room_id in invites:
|
||||
if room_id in self._joined_rooms:
|
||||
continue
|
||||
logger.info("Matrix: reconciling pending invite for %s", room_id)
|
||||
await self._join_room_by_id(str(room_id))
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Reactions (send, receive, processing lifecycle)
|
||||
|
|
|
|||
|
|
@ -1204,6 +1204,40 @@ class TestMatrixSyncLoop:
|
|||
fake_client.handle_sync.assert_called_once()
|
||||
mock_sync_store.put_next_batch.assert_awaited_once_with("s1234")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sync_loop_reconciles_pending_invites(self):
|
||||
"""Pending rooms.invite entries should be joined if callbacks were missed."""
|
||||
adapter = _make_adapter()
|
||||
adapter._closing = False
|
||||
|
||||
async def _sync_once(**kwargs):
|
||||
adapter._closing = True
|
||||
return {
|
||||
"rooms": {
|
||||
"join": {"!joined:example.org": {}},
|
||||
"invite": {"!invited:example.org": {}},
|
||||
},
|
||||
"next_batch": "s1234",
|
||||
}
|
||||
|
||||
mock_sync_store = MagicMock()
|
||||
mock_sync_store.get_next_batch = AsyncMock(return_value=None)
|
||||
mock_sync_store.put_next_batch = AsyncMock()
|
||||
|
||||
fake_client = MagicMock()
|
||||
fake_client.sync = AsyncMock(side_effect=_sync_once)
|
||||
fake_client.join_room = AsyncMock()
|
||||
fake_client.sync_store = mock_sync_store
|
||||
fake_client.handle_sync = MagicMock(return_value=[])
|
||||
adapter._client = fake_client
|
||||
|
||||
with patch.object(adapter, "_refresh_dm_cache", AsyncMock()):
|
||||
await adapter._sync_loop()
|
||||
|
||||
fake_client.join_room.assert_awaited_once()
|
||||
assert "!joined:example.org" in adapter._joined_rooms
|
||||
assert "!invited:example.org" in adapter._joined_rooms
|
||||
|
||||
|
||||
class TestMatrixUploadAndSend:
|
||||
@pytest.mark.asyncio
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue