diff --git a/plugins/memory/honcho/session.py b/plugins/memory/honcho/session.py index 55b9e0d187..8e7018d436 100644 --- a/plugins/memory/honcho/session.py +++ b/plugins/memory/honcho/session.py @@ -95,6 +95,7 @@ class HonchoSessionManager: self._config = config self._runtime_user_peer_name = runtime_user_peer_name self._cache: dict[str, HonchoSession] = {} + self._cache_lock = threading.RLock() self._peers_cache: dict[str, Any] = {} self._sessions_cache: dict[str, Any] = {} @@ -273,10 +274,12 @@ class HonchoSessionManager: Returns: The session. """ - if key in self._cache: - logger.debug("Local session cache hit: %s", key) - return self._cache[key] + with self._cache_lock: + if key in self._cache: + logger.debug("Local session cache hit: %s", key) + return self._cache[key] + # Determine peer IDs — no lock needed (read-only, no shared state mutation). # Gateway sessions normally use the runtime user identity (the # platform-native ID: Telegram UID, Discord snowflake, Slack user, # etc.) so multi-user bots scope memory per user. For a single-user @@ -300,7 +303,6 @@ class HonchoSessionManager: elif self._config and self._config.peer_name: user_peer_id = self._sanitize_id(self._config.peer_name) else: - # Fallback: derive from session key parts = key.split(":", 1) channel = parts[0] if len(parts) > 1 else "default" chat_id = parts[1] if len(parts) > 1 else key @@ -310,19 +312,14 @@ class HonchoSessionManager: self._config.ai_peer if self._config else "hermes-assistant" ) - # Sanitize session ID for Honcho + # All expensive I/O outside the lock — Honcho's persistence is source of truth honcho_session_id = self._sanitize_id(key) - - # Get or create peers user_peer = self._get_or_create_peer(user_peer_id) assistant_peer = self._get_or_create_peer(assistant_peer_id) - - # Get or create Honcho session honcho_session, existing_messages = self._get_or_create_honcho_session( honcho_session_id, user_peer, assistant_peer ) - # Convert Honcho messages to local format local_messages = [] for msg in existing_messages: role = "assistant" if msg.peer_id == assistant_peer_id else "user" @@ -330,10 +327,9 @@ class HonchoSessionManager: "role": role, "content": msg.content, "timestamp": msg.created_at.isoformat() if msg.created_at else "", - "_synced": True, # Already in Honcho + "_synced": True, }) - # Create local session wrapper with existing messages session = HonchoSession( key=key, user_peer_id=user_peer_id, @@ -342,7 +338,9 @@ class HonchoSessionManager: messages=local_messages, ) - self._cache[key] = session + # Write to cache under lock — only one writer wins + with self._cache_lock: + self._cache[key] = session return session def _flush_session(self, session: HonchoSession) -> bool: @@ -373,13 +371,15 @@ class HonchoSessionManager: for msg in new_messages: msg["_synced"] = True logger.debug("Synced %d messages to Honcho for %s", len(honcho_messages), session.key) - self._cache[session.key] = session + with self._cache_lock: + self._cache[session.key] = session return True except Exception as e: for msg in new_messages: msg["_synced"] = False logger.error("Failed to sync messages to Honcho: %s", e) - self._cache[session.key] = session + with self._cache_lock: + self._cache[session.key] = session return False def _async_writer_loop(self) -> None: @@ -451,7 +451,9 @@ class HonchoSessionManager: Called at session end for "session" write_frequency, or to force a sync before process exit regardless of mode. """ - for session in list(self._cache.values()): + with self._cache_lock: + sessions = list(self._cache.values()) + for session in sessions: try: self._flush_session(session) except Exception as e: @@ -476,9 +478,10 @@ class HonchoSessionManager: def delete(self, key: str) -> bool: """Delete a session from local cache.""" - if key in self._cache: - del self._cache[key] - return True + with self._cache_lock: + if key in self._cache: + del self._cache[key] + return True return False def new_session(self, key: str) -> HonchoSession: @@ -490,20 +493,22 @@ class HonchoSessionManager: """ import time - # Remove old session from caches (but don't delete from Honcho) - old_session = self._cache.pop(key, None) - if old_session: - self._sessions_cache.pop(old_session.honcho_session_id, None) + with self._cache_lock: + # Remove old session from caches (but don't delete from Honcho) + old_session = self._cache.pop(key, None) + if old_session: + self._sessions_cache.pop(old_session.honcho_session_id, None) - # Create new session with timestamp suffix - timestamp = int(time.time()) - new_key = f"{key}:{timestamp}" + # Create new session with timestamp suffix + timestamp = int(time.time()) + new_key = f"{key}:{timestamp}" # get_or_create will create a fresh session session = self.get_or_create(new_key) # Cache under the original key so callers find it by the expected name - self._cache[key] = session + with self._cache_lock: + self._cache[key] = session logger.info("Created new session for %s (honcho: %s)", key, session.honcho_session_id) return session