fix(gateway): detach pending_watchers batch + normalize LRU caches + align test fixtures + AUTHOR_MAP

Self-review follow-up on top of the salvaged perf fixes:

- gateway/run.py (both watcher-drain sites): the salvaged O(n^2) fix
  (#32708) replaced `while pending_watchers: pop(0)` with iterate-then-
  `watchers.clear()`, but `watchers` aliased the registry's live list.
  A watcher appended by a concurrent session during the `await
  asyncio.sleep(0)` yield would be cleared without ever being scheduled.
  Detach the batch atomically (`pending_watchers = []`) before iterating.

- gateway/platforms/bluebubbles.py: normalize the salvaged _guid_cache
  LRU (#30523) to match feishu/codebase precedent — module-level
  `_GUID_CACHE_SIZE` constant, `while len > cap`, and drop the redundant
  post-insert `move_to_end` (a fresh insert is already most-recent).

- gateway/platforms/feishu.py: drop the same redundant post-insert
  `move_to_end` from the salvaged _message_text_cache LRU (#23706).

- scripts/release.py: add AUTHOR_MAP entries for the salvaged commits'
  authors (amathxbt #22155, ErnestHysa #32636/#32708) so the contributor
  audit passes when these commits land on main.

- tests/tools/test_tool_output_limits.py: autouse fixture resets the new
  module-level limits cache between tests.

- tests/gateway/test_feishu.py: hand-built adapter fixture seeded
  _message_text_cache as a plain dict; it's now an OrderedDict, so the
  fixture type had to match.
This commit is contained in:
kshitijk4poor 2026-05-31 12:50:59 +05:30 committed by kshitij
parent 0036c72923
commit 32899279a7
6 changed files with 28 additions and 9 deletions

View file

@ -61,6 +61,8 @@ _MESSAGE_EVENTS = {"new-message", "message", "updated-message"}
_PHONE_RE = re.compile(r"\+?\d{7,15}")
_EMAIL_RE = re.compile(r"[\w.+-]+@[\w-]+\.[\w.]+")
_GUID_CACHE_SIZE = 500 # LRU cap for resolved chat-GUID lookups
def _redact(text: str) -> str:
"""Redact phone numbers and emails from log output."""
@ -130,7 +132,6 @@ class BlueBubblesAdapter(BasePlatformAdapter):
self._private_api_enabled: Optional[bool] = None
self._helper_connected: bool = False
self._guid_cache: OrderedDict[str, str] = OrderedDict()
self._GUID_CACHE_MAX = 500
# ------------------------------------------------------------------
# API helpers
@ -380,15 +381,13 @@ class BlueBubblesAdapter(BasePlatformAdapter):
if identifier == target:
if guid:
self._guid_cache[target] = guid
self._guid_cache.move_to_end(target)
if len(self._guid_cache) > self._GUID_CACHE_MAX:
while len(self._guid_cache) > _GUID_CACHE_SIZE:
self._guid_cache.popitem(last=False)
return guid
for part in chat.get("participants", []) or []:
if (part.get("address") or "").strip() == target and guid:
self._guid_cache[target] = guid
self._guid_cache.move_to_end(target)
if len(self._guid_cache) > self._GUID_CACHE_MAX:
while len(self._guid_cache) > _GUID_CACHE_SIZE:
self._guid_cache.popitem(last=False)
return guid
except Exception:

View file

@ -3982,7 +3982,6 @@ class FeishuAdapter(BasePlatformAdapter):
mentions=parent_mentions,
)
self._message_text_cache[message_id] = text
self._message_text_cache.move_to_end(message_id)
while len(self._message_text_cache) > _FEISHU_MESSAGE_TEXT_CACHE_SIZE:
self._message_text_cache.popitem(last=False)
return text

View file

@ -4465,7 +4465,12 @@ class GatewayRunner:
# Drain any recovered process watchers (from crash recovery checkpoint)
try:
from tools.process_registry import process_registry
# Detach the current batch atomically: reassigning to a fresh list
# takes ownership of exactly the watchers present now, so any watcher
# appended concurrently during the yield below isn't silently dropped
# by a clear() on the shared list.
watchers = process_registry.pending_watchers
process_registry.pending_watchers = []
# Process in batches of 100 with event-loop yield points to avoid
# O(n^2) event-loop blocking when recovering thousands of watchers.
for i, watcher in enumerate(watchers):
@ -4473,7 +4478,6 @@ class GatewayRunner:
logger.info("Resumed watcher for recovered process %s", watcher.get("session_id"))
if i % 100 == 99:
await asyncio.sleep(0)
watchers.clear()
except Exception as e:
logger.error("Recovered watcher setup error: %s", e)
@ -9166,12 +9170,15 @@ class GatewayRunner:
# Check for pending process watchers (check_interval on background processes)
try:
from tools.process_registry import process_registry
# Detach the current batch atomically (see crash-recovery drain
# above): reassign to a fresh list so a watcher appended by a
# concurrent session during the yield isn't dropped by clear().
watchers = process_registry.pending_watchers
process_registry.pending_watchers = []
for i, watcher in enumerate(watchers):
asyncio.create_task(self._run_process_watcher(watcher))
if i % 100 == 99:
await asyncio.sleep(0)
watchers.clear()
except Exception as e:
logger.error("Process watcher setup error: %s", e)

View file

@ -1406,6 +1406,9 @@ AUTHOR_MAP = {
"peter.yuqin@gmail.com": "WuKongAI-CMU", # PR #10082 (reject symlinked audio inputs)
"sunil.nitie@gmail.com": "Sunil123135", # PR #31031 (Windows Docker Desktop compose)
"weichangyuwcy@gmail.com": "ChyuWei", # PR #30987 (TUI TTS env var on voice off)
# batch salvage PR #35758 (perf micro-fixes)
"116212274+amathxbt@users.noreply.github.com": "amathxbt", # PR #22155 (cache tool_output_limits)
"takis312@hotmail.com": "ErnestHysa", # PRs #32636/#32708 (MCP asyncio.sleep + O(n^2) watcher drain)
}

View file

@ -6,6 +6,7 @@ import os
import tempfile
import time
import unittest
from collections import OrderedDict
from pathlib import Path
from types import SimpleNamespace
from typing import Dict
@ -4603,7 +4604,7 @@ class TestFeishuFetchMessageText(unittest.TestCase):
adapter._bot_open_id = "ou_bot"
adapter._bot_user_id = ""
adapter._bot_name = "Hermes"
adapter._message_text_cache = {}
adapter._message_text_cache = OrderedDict()
adapter._client = Mock()
adapter._build_get_message_request = Mock(return_value=object())
return adapter

View file

@ -22,6 +22,16 @@ import pytest
from tools import tool_output_limits as tol
@pytest.fixture(autouse=True)
def _reset_limits_cache():
"""get_tool_output_limits() now memoizes its result for the process
lifetime, so each test must start from a clean cache to observe the
config value it patches in."""
tol._reset_tool_output_limits_cache()
yield
tol._reset_tool_output_limits_cache()
class TestDefaults:
def test_defaults_match_previous_hardcoded_values(self):
assert tol.DEFAULT_MAX_BYTES == 50_000