fix(gateway/slack): align reaction lifecycle with Discord/Telegram pattern

Slack reactions were placed around handle_message(), which returns
immediately after spawning a background task. This caused the 👀 swap to happen before any real work began.

Fix: implement on_processing_start / on_processing_complete callbacks
(matching Discord/Telegram) so reactions bracket actual _message_handler
work driven by the base class.

Also fixes missing stop_typing() for Slack's assistant thread status
indicator, which left 'is thinking...' stuck in the UI after processing
completed.

- Add _reacting_message_ids set for DM/@mention-only gating
- Add _active_status_threads dict for stop_typing lookup
- Update test_reactions_in_message_flow for new callback pattern
- Add test_reactions_failure_outcome and test_reactions_skipped_for_non_dm_non_mention
This commit is contained in:
Roopak Nijhara 2026-04-22 19:57:51 +05:30 committed by kshitij
parent 04e039f687
commit 70a33708e7
2 changed files with 133 additions and 9 deletions

View file

@ -38,6 +38,7 @@ from gateway.platforms.base import (
BasePlatformAdapter, BasePlatformAdapter,
MessageEvent, MessageEvent,
MessageType, MessageType,
ProcessingOutcome,
SendResult, SendResult,
SUPPORTED_DOCUMENT_TYPES, SUPPORTED_DOCUMENT_TYPES,
safe_url_for_log, safe_url_for_log,
@ -113,6 +114,11 @@ class SlackAdapter(BasePlatformAdapter):
# Cache for _fetch_thread_context results: cache_key → _ThreadContextCache # Cache for _fetch_thread_context results: cache_key → _ThreadContextCache
self._thread_context_cache: Dict[str, _ThreadContextCache] = {} self._thread_context_cache: Dict[str, _ThreadContextCache] = {}
self._THREAD_CACHE_TTL = 60.0 self._THREAD_CACHE_TTL = 60.0
# Track message IDs that should get reaction lifecycle (DMs / @mentions).
self._reacting_message_ids: set = set()
# Track active assistant thread status indicators so stop_typing can
# clear them (chat_id → thread_ts).
self._active_status_threads: Dict[str, str] = {}
async def connect(self) -> bool: async def connect(self) -> bool:
"""Connect to Slack via Socket Mode.""" """Connect to Slack via Socket Mode."""
@ -362,6 +368,7 @@ class SlackAdapter(BasePlatformAdapter):
if not thread_ts: if not thread_ts:
return # Can only set status in a thread context return # Can only set status in a thread context
self._active_status_threads[chat_id] = thread_ts
try: try:
await self._get_client(chat_id).assistant_threads_setStatus( await self._get_client(chat_id).assistant_threads_setStatus(
channel_id=chat_id, channel_id=chat_id,
@ -373,6 +380,22 @@ class SlackAdapter(BasePlatformAdapter):
# in an assistant-enabled context. Falls back to reactions. # in an assistant-enabled context. Falls back to reactions.
logger.debug("[Slack] assistant.threads.setStatus failed: %s", e) logger.debug("[Slack] assistant.threads.setStatus failed: %s", e)
async def stop_typing(self, chat_id: str) -> None:
"""Clear the assistant thread status indicator."""
if not self._app:
return
thread_ts = self._active_status_threads.pop(chat_id, None)
if not thread_ts:
return
try:
await self._get_client(chat_id).assistant_threads_setStatus(
channel_id=chat_id,
thread_ts=thread_ts,
status="",
)
except Exception as e:
logger.debug("[Slack] assistant.threads.setStatus clear failed: %s", e)
def _dm_top_level_threads_as_sessions(self) -> bool: def _dm_top_level_threads_as_sessions(self) -> bool:
"""Whether top-level Slack DMs get per-message session threads. """Whether top-level Slack DMs get per-message session threads.
@ -584,6 +607,30 @@ class SlackAdapter(BasePlatformAdapter):
logger.debug("[Slack] reactions.remove failed (%s): %s", emoji, e) logger.debug("[Slack] reactions.remove failed (%s): %s", emoji, e)
return False return False
async def on_processing_start(self, event: MessageEvent) -> None:
"""Add an in-progress reaction when message processing begins."""
ts = getattr(event, "message_id", None)
if not ts or ts not in self._reacting_message_ids:
return
channel_id = getattr(event.source, "chat_id", None)
if channel_id:
await self._add_reaction(channel_id, ts, "eyes")
async def on_processing_complete(self, event: MessageEvent, outcome: ProcessingOutcome) -> None:
"""Swap the in-progress reaction for a final success/failure reaction."""
ts = getattr(event, "message_id", None)
if not ts or ts not in self._reacting_message_ids:
return
self._reacting_message_ids.discard(ts)
channel_id = getattr(event.source, "chat_id", None)
if not channel_id:
return
await self._remove_reaction(channel_id, ts, "eyes")
if outcome == ProcessingOutcome.SUCCESS:
await self._add_reaction(channel_id, ts, "white_check_mark")
elif outcome == ProcessingOutcome.FAILURE:
await self._add_reaction(channel_id, ts, "x")
# ----- User identity resolution ----- # ----- User identity resolution -----
async def _resolve_user_name(self, user_id: str, chat_id: str = "") -> str: async def _resolve_user_name(self, user_id: str, chat_id: str = "") -> str:
@ -1214,16 +1261,11 @@ class SlackAdapter(BasePlatformAdapter):
# In listen-all channels (require_mention=false), reacting to every # In listen-all channels (require_mention=false), reacting to every
# casual message would be noisy. # casual message would be noisy.
_should_react = is_dm or is_mentioned _should_react = is_dm or is_mentioned
if _should_react: if _should_react:
await self._add_reaction(channel_id, ts, "eyes") self._reacting_message_ids.add(ts)
await self.handle_message(msg_event) await self.handle_message(msg_event)
if _should_react:
await self._remove_reaction(channel_id, ts, "eyes")
await self._add_reaction(channel_id, ts, "white_check_mark")
# ----- Approval button support (Block Kit) ----- # ----- Approval button support (Block Kit) -----
async def send_exec_approval( async def send_exec_approval(

View file

@ -1031,7 +1031,7 @@ class TestReactions:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_reactions_in_message_flow(self, adapter): async def test_reactions_in_message_flow(self, adapter):
"""Reactions should be added on receipt and swapped on completion.""" """Reactions should be bracketed around actual processing via hooks."""
adapter._app.client.reactions_add = AsyncMock() adapter._app.client.reactions_add = AsyncMock()
adapter._app.client.reactions_remove = AsyncMock() adapter._app.client.reactions_remove = AsyncMock()
adapter._app.client.users_info = AsyncMock(return_value={ adapter._app.client.users_info = AsyncMock(return_value={
@ -1047,15 +1047,97 @@ class TestReactions:
} }
await adapter._handle_slack_message(event) await adapter._handle_slack_message(event)
# Should have added 👀, then removed 👀, then added ✅ # _handle_slack_message should register the message for reactions
assert "1234567890.000001" in adapter._reacting_message_ids
# Simulate the base class calling on_processing_start
from gateway.platforms.base import MessageEvent, MessageType, SessionSource
from gateway.config import Platform
source = SessionSource(
platform=Platform.SLACK,
chat_id="C123",
chat_type="dm",
user_id="U_USER",
)
msg_event = MessageEvent(
text="hello",
message_type=MessageType.TEXT,
source=source,
message_id="1234567890.000001",
)
await adapter.on_processing_start(msg_event)
add_calls = adapter._app.client.reactions_add.call_args_list
assert len(add_calls) == 1
assert add_calls[0].kwargs["name"] == "eyes"
# Simulate the base class calling on_processing_complete
from gateway.platforms.base import ProcessingOutcome
await adapter.on_processing_complete(msg_event, ProcessingOutcome.SUCCESS)
add_calls = adapter._app.client.reactions_add.call_args_list add_calls = adapter._app.client.reactions_add.call_args_list
remove_calls = adapter._app.client.reactions_remove.call_args_list remove_calls = adapter._app.client.reactions_remove.call_args_list
assert len(add_calls) == 2 assert len(add_calls) == 2
assert add_calls[0].kwargs["name"] == "eyes"
assert add_calls[1].kwargs["name"] == "white_check_mark" assert add_calls[1].kwargs["name"] == "white_check_mark"
assert len(remove_calls) == 1 assert len(remove_calls) == 1
assert remove_calls[0].kwargs["name"] == "eyes" assert remove_calls[0].kwargs["name"] == "eyes"
# Message ID should be cleaned up
assert "1234567890.000001" not in adapter._reacting_message_ids
@pytest.mark.asyncio
async def test_reactions_failure_outcome(self, adapter):
"""Failed processing should add :x: instead of :white_check_mark:."""
adapter._app.client.reactions_add = AsyncMock()
adapter._app.client.reactions_remove = AsyncMock()
from gateway.platforms.base import MessageEvent, MessageType, SessionSource, ProcessingOutcome
from gateway.config import Platform
source = SessionSource(
platform=Platform.SLACK,
chat_id="C123",
chat_type="dm",
user_id="U_USER",
)
adapter._reacting_message_ids.add("1234567890.000002")
msg_event = MessageEvent(
text="hello",
message_type=MessageType.TEXT,
source=source,
message_id="1234567890.000002",
)
await adapter.on_processing_complete(msg_event, ProcessingOutcome.FAILURE)
add_calls = adapter._app.client.reactions_add.call_args_list
remove_calls = adapter._app.client.reactions_remove.call_args_list
assert len(add_calls) == 1
assert add_calls[0].kwargs["name"] == "x"
assert len(remove_calls) == 1
assert remove_calls[0].kwargs["name"] == "eyes"
@pytest.mark.asyncio
async def test_reactions_skipped_for_non_dm_non_mention(self, adapter):
"""Non-DM, non-mention messages should not get reactions."""
adapter._app.client.reactions_add = AsyncMock()
adapter._app.client.reactions_remove = AsyncMock()
adapter._app.client.users_info = AsyncMock(return_value={
"user": {"profile": {"display_name": "Tyler"}}
})
event = {
"text": "hello",
"user": "U_USER",
"channel": "C123",
"channel_type": "channel",
"ts": "1234567890.000003",
}
await adapter._handle_slack_message(event)
# Should NOT register for reactions when not mentioned in a channel
assert "1234567890.000003" not in adapter._reacting_message_ids
adapter._app.client.reactions_add.assert_not_called()
adapter._app.client.reactions_remove.assert_not_called()
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# TestThreadReplyHandling # TestThreadReplyHandling