fix(matrix): defer reaction cleanup redactions

This commit is contained in:
LeonSGP43 2026-05-03 21:13:50 +08:00 committed by Teknium
parent 8cef149131
commit 31f22890ea
2 changed files with 93 additions and 9 deletions

View file

@ -364,6 +364,12 @@ class MatrixAdapter(BasePlatformAdapter):
"MATRIX_REACTIONS", "true"
).lower() not in ("false", "0", "no")
self._pending_reactions: dict[tuple[str, str], str] = {}
# Delay before redacting reactions so Matrix homeservers have time to
# deliver the final message event without tripping "missing event"
# errors in some clients. 5s is empirically safe; not user-tunable —
# if that changes, add a config.yaml entry rather than an env var.
self._reaction_redaction_delay_seconds = 5.0
self._reaction_redaction_tasks: Set[asyncio.Task] = set()
# Proxy support — resolve once at init, reuse for all HTTP traffic.
self._proxy_url: str | None = resolve_proxy_url(platform_env_var="MATRIX_PROXY")
@ -851,6 +857,14 @@ class MatrixAdapter(BasePlatformAdapter):
except (asyncio.CancelledError, Exception):
pass
redaction_tasks = list(self._reaction_redaction_tasks)
for task in redaction_tasks:
if not task.done():
task.cancel()
if redaction_tasks:
await asyncio.gather(*redaction_tasks, return_exceptions=True)
self._reaction_redaction_tasks.clear()
# Close the SQLite crypto store database.
if hasattr(self, "_crypto_db") and self._crypto_db:
try:
@ -1929,6 +1943,35 @@ class MatrixAdapter(BasePlatformAdapter):
"""Remove a reaction by redacting its event."""
return await self.redact_message(room_id, reaction_event_id, reason)
def _schedule_reaction_redaction(
self,
room_id: str,
reaction_event_id: str,
reason: str = "",
) -> None:
"""Redact a reaction after a short delay so message delivery settles."""
async def _redact_later() -> None:
try:
if self._reaction_redaction_delay_seconds:
await asyncio.sleep(self._reaction_redaction_delay_seconds)
if not await self._redact_reaction(room_id, reaction_event_id, reason):
logger.debug(
"Matrix: failed to redact reaction %s", reaction_event_id
)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.debug(
"Matrix: delayed reaction redaction failed for %s: %s",
reaction_event_id,
exc,
)
task = asyncio.create_task(_redact_later())
self._reaction_redaction_tasks.add(task)
task.add_done_callback(self._reaction_redaction_tasks.discard)
async def on_processing_start(self, event: MessageEvent) -> None:
"""Add eyes reaction when the agent starts processing a message."""
if not self._reactions_enabled:
@ -1957,8 +2000,11 @@ class MatrixAdapter(BasePlatformAdapter):
reaction_key = (room_id, msg_id)
if reaction_key in self._pending_reactions:
eyes_event_id = self._pending_reactions.pop(reaction_key)
if not await self._redact_reaction(room_id, eyes_event_id):
logger.debug("Matrix: failed to redact eyes reaction %s", eyes_event_id)
self._schedule_reaction_redaction(
room_id,
eyes_event_id,
"processing complete",
)
await self._send_reaction(
room_id,
msg_id,
@ -2037,11 +2083,8 @@ class MatrixAdapter(BasePlatformAdapter):
) -> None:
"""Redact the bot's seed ✅/❎ reactions, leaving only the user's reaction."""
for emoji, evt_id in prompt.bot_reaction_events.items():
try:
await self.redact_message(room_id, evt_id, "approval resolved")
logger.debug("Matrix: redacted bot reaction %s (%s)", emoji, evt_id)
except Exception as exc:
logger.debug("Matrix: failed to redact bot reaction %s: %s", emoji, exc)
self._schedule_reaction_redaction(room_id, evt_id, "approval resolved")
logger.debug("Matrix: scheduled bot reaction redaction %s (%s)", emoji, evt_id)
# ------------------------------------------------------------------
# Text message aggregation (handles Matrix client-side splits)

View file

@ -1738,6 +1738,7 @@ class TestMatrixReactions:
from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome
self.adapter._reactions_enabled = True
self.adapter._reaction_redaction_delay_seconds = 0.01
self.adapter._pending_reactions = {("!room:ex", "$msg1"): "$eyes_reaction_123"}
self.adapter._redact_reaction = AsyncMock(return_value=True)
self.adapter._send_reaction = AsyncMock(return_value="$check_reaction_456")
@ -1752,14 +1753,21 @@ class TestMatrixReactions:
message_id="$msg1",
)
await self.adapter.on_processing_complete(event, ProcessingOutcome.SUCCESS)
self.adapter._redact_reaction.assert_called_once_with("!room:ex", "$eyes_reaction_123")
self.adapter._redact_reaction.assert_not_awaited()
self.adapter._send_reaction.assert_called_once_with("!room:ex", "$msg1", "\u2705")
await asyncio.sleep(0.03)
self.adapter._redact_reaction.assert_awaited_once_with(
"!room:ex",
"$eyes_reaction_123",
"processing complete",
)
@pytest.mark.asyncio
async def test_on_processing_complete_sends_cross_on_failure(self):
from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome
self.adapter._reactions_enabled = True
self.adapter._reaction_redaction_delay_seconds = 0.01
self.adapter._pending_reactions = {("!room:ex", "$msg1"): "$eyes_reaction_123"}
self.adapter._redact_reaction = AsyncMock(return_value=True)
self.adapter._send_reaction = AsyncMock(return_value="$cross_reaction_456")
@ -1774,8 +1782,14 @@ class TestMatrixReactions:
message_id="$msg1",
)
await self.adapter.on_processing_complete(event, ProcessingOutcome.FAILURE)
self.adapter._redact_reaction.assert_called_once_with("!room:ex", "$eyes_reaction_123")
self.adapter._redact_reaction.assert_not_awaited()
self.adapter._send_reaction.assert_called_once_with("!room:ex", "$msg1", "\u274c")
await asyncio.sleep(0.03)
self.adapter._redact_reaction.assert_awaited_once_with(
"!room:ex",
"$eyes_reaction_123",
"processing complete",
)
@pytest.mark.asyncio
async def test_on_processing_complete_cancelled_sends_no_terminal_reaction(self):
@ -1819,6 +1833,33 @@ class TestMatrixReactions:
self.adapter._redact_reaction.assert_not_called()
self.adapter._send_reaction.assert_called_once_with("!room:ex", "$msg1", "\u2705")
@pytest.mark.asyncio
async def test_approval_reaction_cleanup_is_delayed(self):
"""Bot approval reaction redactions should not run inline."""
self.adapter._reaction_redaction_delay_seconds = 0.01
self.adapter._redact_reaction = AsyncMock(return_value=True)
prompt = MagicMock()
prompt.bot_reaction_events = {
"\u2705": "$allow_reaction",
"\u274e": "$deny_reaction",
}
await self.adapter._redact_bot_approval_reactions("!room:ex", prompt)
self.adapter._redact_reaction.assert_not_awaited()
await asyncio.sleep(0.03)
self.adapter._redact_reaction.assert_any_await(
"!room:ex",
"$allow_reaction",
"approval resolved",
)
self.adapter._redact_reaction.assert_any_await(
"!room:ex",
"$deny_reaction",
"approval resolved",
)
@pytest.mark.asyncio
async def test_reactions_disabled(self):
from gateway.platforms.base import MessageEvent, MessageType