mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(slack): add rate-limit retry and TTL cache to thread context fetching
- Add _ThreadContextCache dataclass for caching fetched context (60s TTL) - Add exponential backoff retry for conversations.replies 429 rate limits (Tier 3, ~50 req/min) - Only fetch context when no active session exists (guard at call site) to prevent duplication across turns - Hoist bot_uid lookup outside the per-message loop - Clearer header text for injected thread context Based on PR #6162 by jarvisxyz, cherry-picked onto current main.
This commit is contained in:
parent
18d8e91a5a
commit
88845b99d2
1 changed files with 80 additions and 21 deletions
|
|
@ -14,6 +14,7 @@ import logging
|
|||
import os
|
||||
import re
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Dict, Optional, Any, Tuple
|
||||
|
||||
try:
|
||||
|
|
@ -45,6 +46,14 @@ from gateway.platforms.base import (
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class _ThreadContextCache:
|
||||
"""Cache entry for fetched thread context."""
|
||||
content: str
|
||||
fetched_at: float = field(default_factory=time.monotonic)
|
||||
message_count: int = 0
|
||||
|
||||
|
||||
def check_slack_requirements() -> bool:
|
||||
"""Check if Slack dependencies are available."""
|
||||
return SLACK_AVAILABLE
|
||||
|
|
@ -101,6 +110,9 @@ class SlackAdapter(BasePlatformAdapter):
|
|||
# session + memory scoping.
|
||||
self._assistant_threads: Dict[Tuple[str, str], Dict[str, str]] = {}
|
||||
self._ASSISTANT_THREADS_MAX = 5000
|
||||
# Cache for _fetch_thread_context results: cache_key → _ThreadContextCache
|
||||
self._thread_context_cache: Dict[str, _ThreadContextCache] = {}
|
||||
self._THREAD_CACHE_TTL = 60.0
|
||||
|
||||
async def connect(self) -> bool:
|
||||
"""Connect to Slack via Socket Mode."""
|
||||
|
|
@ -1377,57 +1389,104 @@ class SlackAdapter(BasePlatformAdapter):
|
|||
"""Fetch recent thread messages to provide context when the bot is
|
||||
mentioned mid-thread for the first time.
|
||||
|
||||
Returns a formatted string with thread history, or empty string on
|
||||
failure or if the thread is empty (just the parent message).
|
||||
This method is only called when there is NO active session for the
|
||||
thread (guarded at the call site by _has_active_session_for_thread).
|
||||
That guard ensures thread messages are prepended only on the very
|
||||
first turn — after that the session history already holds them, so
|
||||
there is no duplication across subsequent turns.
|
||||
|
||||
Results are cached for _THREAD_CACHE_TTL seconds per thread to avoid
|
||||
hammering conversations.replies (Tier 3, ~50 req/min).
|
||||
|
||||
Returns a formatted string with prior thread history, or empty string
|
||||
on failure or if the thread has no prior messages.
|
||||
"""
|
||||
cache_key = f"{channel_id}:{thread_ts}"
|
||||
now = time.monotonic()
|
||||
cached = self._thread_context_cache.get(cache_key)
|
||||
if cached and (now - cached.fetched_at) < self._THREAD_CACHE_TTL:
|
||||
return cached.content
|
||||
|
||||
try:
|
||||
client = self._get_client(channel_id)
|
||||
result = await client.conversations_replies(
|
||||
channel=channel_id,
|
||||
ts=thread_ts,
|
||||
limit=limit + 1, # +1 because it includes the current message
|
||||
inclusive=True,
|
||||
)
|
||||
|
||||
# Retry with exponential backoff for Tier-3 rate limits (429).
|
||||
result = None
|
||||
for attempt in range(3):
|
||||
try:
|
||||
result = await client.conversations_replies(
|
||||
channel=channel_id,
|
||||
ts=thread_ts,
|
||||
limit=limit + 1, # +1 because it includes the current message
|
||||
inclusive=True,
|
||||
)
|
||||
break
|
||||
except Exception as exc:
|
||||
# Check for rate-limit error from slack_sdk
|
||||
err_str = str(exc).lower()
|
||||
is_rate_limit = (
|
||||
"ratelimited" in err_str
|
||||
or "429" in err_str
|
||||
or "rate_limited" in err_str
|
||||
)
|
||||
if is_rate_limit and attempt < 2:
|
||||
retry_after = 1.0 * (2 ** attempt) # 1s, 2s
|
||||
logger.warning(
|
||||
"[Slack] conversations.replies rate limited; retrying in %.1fs (attempt %d/3)",
|
||||
retry_after, attempt + 1,
|
||||
)
|
||||
await asyncio.sleep(retry_after)
|
||||
continue
|
||||
raise
|
||||
|
||||
if result is None:
|
||||
return ""
|
||||
|
||||
messages = result.get("messages", [])
|
||||
if not messages:
|
||||
return ""
|
||||
|
||||
bot_uid = self._team_bot_user_ids.get(team_id, self._bot_user_id)
|
||||
context_parts = []
|
||||
for msg in messages:
|
||||
msg_ts = msg.get("ts", "")
|
||||
# Skip the current message (the one that triggered this fetch)
|
||||
# Exclude the current triggering message — it will be delivered
|
||||
# as the user message itself, so including it here would duplicate it.
|
||||
if msg_ts == current_ts:
|
||||
continue
|
||||
# Skip bot messages from ourselves
|
||||
# Exclude our own bot messages to avoid circular context.
|
||||
if msg.get("bot_id") or msg.get("subtype") == "bot_message":
|
||||
continue
|
||||
|
||||
msg_user = msg.get("user", "unknown")
|
||||
msg_text = msg.get("text", "").strip()
|
||||
if not msg_text:
|
||||
continue
|
||||
|
||||
# Strip bot mentions from context messages
|
||||
bot_uid = self._team_bot_user_ids.get(team_id, self._bot_user_id)
|
||||
if bot_uid:
|
||||
msg_text = msg_text.replace(f"<@{bot_uid}>", "").strip()
|
||||
|
||||
# Mark the thread parent
|
||||
msg_user = msg.get("user", "unknown")
|
||||
is_parent = msg_ts == thread_ts
|
||||
prefix = "[thread parent] " if is_parent else ""
|
||||
|
||||
# Resolve user name (cached)
|
||||
name = await self._resolve_user_name(msg_user, chat_id=channel_id)
|
||||
context_parts.append(f"{prefix}{name}: {msg_text}")
|
||||
|
||||
if not context_parts:
|
||||
return ""
|
||||
content = ""
|
||||
if context_parts:
|
||||
content = (
|
||||
"[Thread context — prior messages in this thread (not yet in conversation history):]\n"
|
||||
+ "\n".join(context_parts)
|
||||
+ "\n[End of thread context]\n\n"
|
||||
)
|
||||
|
||||
return (
|
||||
"[Thread context — previous messages in this thread:]\n"
|
||||
+ "\n".join(context_parts)
|
||||
+ "\n[End of thread context]\n\n"
|
||||
self._thread_context_cache[cache_key] = _ThreadContextCache(
|
||||
content=content,
|
||||
fetched_at=now,
|
||||
message_count=len(context_parts),
|
||||
)
|
||||
return content
|
||||
|
||||
except Exception as e:
|
||||
logger.warning("[Slack] Failed to fetch thread context: %s", e)
|
||||
return ""
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue