diff --git a/gateway/platforms/slack.py b/gateway/platforms/slack.py index feb08e494..b4973bbbd 100644 --- a/gateway/platforms/slack.py +++ b/gateway/platforms/slack.py @@ -14,6 +14,7 @@ import logging import os import re import time +from dataclasses import dataclass, field from typing import Dict, Optional, Any, Tuple try: @@ -45,6 +46,14 @@ from gateway.platforms.base import ( logger = logging.getLogger(__name__) +@dataclass +class _ThreadContextCache: + """Cache entry for fetched thread context.""" + content: str + fetched_at: float = field(default_factory=time.monotonic) + message_count: int = 0 + + def check_slack_requirements() -> bool: """Check if Slack dependencies are available.""" return SLACK_AVAILABLE @@ -101,6 +110,9 @@ class SlackAdapter(BasePlatformAdapter): # session + memory scoping. self._assistant_threads: Dict[Tuple[str, str], Dict[str, str]] = {} self._ASSISTANT_THREADS_MAX = 5000 + # Cache for _fetch_thread_context results: cache_key → _ThreadContextCache + self._thread_context_cache: Dict[str, _ThreadContextCache] = {} + self._THREAD_CACHE_TTL = 60.0 async def connect(self) -> bool: """Connect to Slack via Socket Mode.""" @@ -1377,57 +1389,104 @@ class SlackAdapter(BasePlatformAdapter): """Fetch recent thread messages to provide context when the bot is mentioned mid-thread for the first time. - Returns a formatted string with thread history, or empty string on - failure or if the thread is empty (just the parent message). + This method is only called when there is NO active session for the + thread (guarded at the call site by _has_active_session_for_thread). + That guard ensures thread messages are prepended only on the very + first turn — after that the session history already holds them, so + there is no duplication across subsequent turns. + + Results are cached for _THREAD_CACHE_TTL seconds per thread to avoid + hammering conversations.replies (Tier 3, ~50 req/min). + + Returns a formatted string with prior thread history, or empty string + on failure or if the thread has no prior messages. """ + cache_key = f"{channel_id}:{thread_ts}" + now = time.monotonic() + cached = self._thread_context_cache.get(cache_key) + if cached and (now - cached.fetched_at) < self._THREAD_CACHE_TTL: + return cached.content + try: client = self._get_client(channel_id) - result = await client.conversations_replies( - channel=channel_id, - ts=thread_ts, - limit=limit + 1, # +1 because it includes the current message - inclusive=True, - ) + + # Retry with exponential backoff for Tier-3 rate limits (429). + result = None + for attempt in range(3): + try: + result = await client.conversations_replies( + channel=channel_id, + ts=thread_ts, + limit=limit + 1, # +1 because it includes the current message + inclusive=True, + ) + break + except Exception as exc: + # Check for rate-limit error from slack_sdk + err_str = str(exc).lower() + is_rate_limit = ( + "ratelimited" in err_str + or "429" in err_str + or "rate_limited" in err_str + ) + if is_rate_limit and attempt < 2: + retry_after = 1.0 * (2 ** attempt) # 1s, 2s + logger.warning( + "[Slack] conversations.replies rate limited; retrying in %.1fs (attempt %d/3)", + retry_after, attempt + 1, + ) + await asyncio.sleep(retry_after) + continue + raise + + if result is None: + return "" + messages = result.get("messages", []) if not messages: return "" + bot_uid = self._team_bot_user_ids.get(team_id, self._bot_user_id) context_parts = [] for msg in messages: msg_ts = msg.get("ts", "") - # Skip the current message (the one that triggered this fetch) + # Exclude the current triggering message — it will be delivered + # as the user message itself, so including it here would duplicate it. if msg_ts == current_ts: continue - # Skip bot messages from ourselves + # Exclude our own bot messages to avoid circular context. if msg.get("bot_id") or msg.get("subtype") == "bot_message": continue - msg_user = msg.get("user", "unknown") msg_text = msg.get("text", "").strip() if not msg_text: continue # Strip bot mentions from context messages - bot_uid = self._team_bot_user_ids.get(team_id, self._bot_user_id) if bot_uid: msg_text = msg_text.replace(f"<@{bot_uid}>", "").strip() - # Mark the thread parent + msg_user = msg.get("user", "unknown") is_parent = msg_ts == thread_ts prefix = "[thread parent] " if is_parent else "" - - # Resolve user name (cached) name = await self._resolve_user_name(msg_user, chat_id=channel_id) context_parts.append(f"{prefix}{name}: {msg_text}") - if not context_parts: - return "" + content = "" + if context_parts: + content = ( + "[Thread context — prior messages in this thread (not yet in conversation history):]\n" + + "\n".join(context_parts) + + "\n[End of thread context]\n\n" + ) - return ( - "[Thread context — previous messages in this thread:]\n" - + "\n".join(context_parts) - + "\n[End of thread context]\n\n" + self._thread_context_cache[cache_key] = _ThreadContextCache( + content=content, + fetched_at=now, + message_count=len(context_parts), ) + return content + except Exception as e: logger.warning("[Slack] Failed to fetch thread context: %s", e) return ""