From 31f22890eaf15fe6fb027a8335335e98ad7e8242 Mon Sep 17 00:00:00 2001 From: LeonSGP43 Date: Sun, 3 May 2026 21:13:50 +0800 Subject: [PATCH] fix(matrix): defer reaction cleanup redactions --- gateway/platforms/matrix.py | 57 +++++++++++++++++++++++++++++++----- tests/gateway/test_matrix.py | 45 ++++++++++++++++++++++++++-- 2 files changed, 93 insertions(+), 9 deletions(-) diff --git a/gateway/platforms/matrix.py b/gateway/platforms/matrix.py index e3bcd24c5e..021fa8e732 100644 --- a/gateway/platforms/matrix.py +++ b/gateway/platforms/matrix.py @@ -364,6 +364,12 @@ class MatrixAdapter(BasePlatformAdapter): "MATRIX_REACTIONS", "true" ).lower() not in ("false", "0", "no") self._pending_reactions: dict[tuple[str, str], str] = {} + # Delay before redacting reactions so Matrix homeservers have time to + # deliver the final message event without tripping "missing event" + # errors in some clients. 5s is empirically safe; not user-tunable — + # if that changes, add a config.yaml entry rather than an env var. + self._reaction_redaction_delay_seconds = 5.0 + self._reaction_redaction_tasks: Set[asyncio.Task] = set() # Proxy support — resolve once at init, reuse for all HTTP traffic. self._proxy_url: str | None = resolve_proxy_url(platform_env_var="MATRIX_PROXY") @@ -851,6 +857,14 @@ class MatrixAdapter(BasePlatformAdapter): except (asyncio.CancelledError, Exception): pass + redaction_tasks = list(self._reaction_redaction_tasks) + for task in redaction_tasks: + if not task.done(): + task.cancel() + if redaction_tasks: + await asyncio.gather(*redaction_tasks, return_exceptions=True) + self._reaction_redaction_tasks.clear() + # Close the SQLite crypto store database. if hasattr(self, "_crypto_db") and self._crypto_db: try: @@ -1929,6 +1943,35 @@ class MatrixAdapter(BasePlatformAdapter): """Remove a reaction by redacting its event.""" return await self.redact_message(room_id, reaction_event_id, reason) + def _schedule_reaction_redaction( + self, + room_id: str, + reaction_event_id: str, + reason: str = "", + ) -> None: + """Redact a reaction after a short delay so message delivery settles.""" + + async def _redact_later() -> None: + try: + if self._reaction_redaction_delay_seconds: + await asyncio.sleep(self._reaction_redaction_delay_seconds) + if not await self._redact_reaction(room_id, reaction_event_id, reason): + logger.debug( + "Matrix: failed to redact reaction %s", reaction_event_id + ) + except asyncio.CancelledError: + raise + except Exception as exc: + logger.debug( + "Matrix: delayed reaction redaction failed for %s: %s", + reaction_event_id, + exc, + ) + + task = asyncio.create_task(_redact_later()) + self._reaction_redaction_tasks.add(task) + task.add_done_callback(self._reaction_redaction_tasks.discard) + async def on_processing_start(self, event: MessageEvent) -> None: """Add eyes reaction when the agent starts processing a message.""" if not self._reactions_enabled: @@ -1957,8 +2000,11 @@ class MatrixAdapter(BasePlatformAdapter): reaction_key = (room_id, msg_id) if reaction_key in self._pending_reactions: eyes_event_id = self._pending_reactions.pop(reaction_key) - if not await self._redact_reaction(room_id, eyes_event_id): - logger.debug("Matrix: failed to redact eyes reaction %s", eyes_event_id) + self._schedule_reaction_redaction( + room_id, + eyes_event_id, + "processing complete", + ) await self._send_reaction( room_id, msg_id, @@ -2037,11 +2083,8 @@ class MatrixAdapter(BasePlatformAdapter): ) -> None: """Redact the bot's seed ✅/❎ reactions, leaving only the user's reaction.""" for emoji, evt_id in prompt.bot_reaction_events.items(): - try: - await self.redact_message(room_id, evt_id, "approval resolved") - logger.debug("Matrix: redacted bot reaction %s (%s)", emoji, evt_id) - except Exception as exc: - logger.debug("Matrix: failed to redact bot reaction %s: %s", emoji, exc) + self._schedule_reaction_redaction(room_id, evt_id, "approval resolved") + logger.debug("Matrix: scheduled bot reaction redaction %s (%s)", emoji, evt_id) # ------------------------------------------------------------------ # Text message aggregation (handles Matrix client-side splits) diff --git a/tests/gateway/test_matrix.py b/tests/gateway/test_matrix.py index 75e1a1e148..bd95fb6136 100644 --- a/tests/gateway/test_matrix.py +++ b/tests/gateway/test_matrix.py @@ -1738,6 +1738,7 @@ class TestMatrixReactions: from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome self.adapter._reactions_enabled = True + self.adapter._reaction_redaction_delay_seconds = 0.01 self.adapter._pending_reactions = {("!room:ex", "$msg1"): "$eyes_reaction_123"} self.adapter._redact_reaction = AsyncMock(return_value=True) self.adapter._send_reaction = AsyncMock(return_value="$check_reaction_456") @@ -1752,14 +1753,21 @@ class TestMatrixReactions: message_id="$msg1", ) await self.adapter.on_processing_complete(event, ProcessingOutcome.SUCCESS) - self.adapter._redact_reaction.assert_called_once_with("!room:ex", "$eyes_reaction_123") + self.adapter._redact_reaction.assert_not_awaited() self.adapter._send_reaction.assert_called_once_with("!room:ex", "$msg1", "\u2705") + await asyncio.sleep(0.03) + self.adapter._redact_reaction.assert_awaited_once_with( + "!room:ex", + "$eyes_reaction_123", + "processing complete", + ) @pytest.mark.asyncio async def test_on_processing_complete_sends_cross_on_failure(self): from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome self.adapter._reactions_enabled = True + self.adapter._reaction_redaction_delay_seconds = 0.01 self.adapter._pending_reactions = {("!room:ex", "$msg1"): "$eyes_reaction_123"} self.adapter._redact_reaction = AsyncMock(return_value=True) self.adapter._send_reaction = AsyncMock(return_value="$cross_reaction_456") @@ -1774,8 +1782,14 @@ class TestMatrixReactions: message_id="$msg1", ) await self.adapter.on_processing_complete(event, ProcessingOutcome.FAILURE) - self.adapter._redact_reaction.assert_called_once_with("!room:ex", "$eyes_reaction_123") + self.adapter._redact_reaction.assert_not_awaited() self.adapter._send_reaction.assert_called_once_with("!room:ex", "$msg1", "\u274c") + await asyncio.sleep(0.03) + self.adapter._redact_reaction.assert_awaited_once_with( + "!room:ex", + "$eyes_reaction_123", + "processing complete", + ) @pytest.mark.asyncio async def test_on_processing_complete_cancelled_sends_no_terminal_reaction(self): @@ -1819,6 +1833,33 @@ class TestMatrixReactions: self.adapter._redact_reaction.assert_not_called() self.adapter._send_reaction.assert_called_once_with("!room:ex", "$msg1", "\u2705") + @pytest.mark.asyncio + async def test_approval_reaction_cleanup_is_delayed(self): + """Bot approval reaction redactions should not run inline.""" + + self.adapter._reaction_redaction_delay_seconds = 0.01 + self.adapter._redact_reaction = AsyncMock(return_value=True) + prompt = MagicMock() + prompt.bot_reaction_events = { + "\u2705": "$allow_reaction", + "\u274e": "$deny_reaction", + } + + await self.adapter._redact_bot_approval_reactions("!room:ex", prompt) + + self.adapter._redact_reaction.assert_not_awaited() + await asyncio.sleep(0.03) + self.adapter._redact_reaction.assert_any_await( + "!room:ex", + "$allow_reaction", + "approval resolved", + ) + self.adapter._redact_reaction.assert_any_await( + "!room:ex", + "$deny_reaction", + "approval resolved", + ) + @pytest.mark.asyncio async def test_reactions_disabled(self): from gateway.platforms.base import MessageEvent, MessageType