From 32899279a744805350be891ccf3ae08289efc702 Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Sun, 31 May 2026 12:50:59 +0530 Subject: [PATCH] fix(gateway): detach pending_watchers batch + normalize LRU caches + align test fixtures + AUTHOR_MAP MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Self-review follow-up on top of the salvaged perf fixes: - gateway/run.py (both watcher-drain sites): the salvaged O(n^2) fix (#32708) replaced `while pending_watchers: pop(0)` with iterate-then- `watchers.clear()`, but `watchers` aliased the registry's live list. A watcher appended by a concurrent session during the `await asyncio.sleep(0)` yield would be cleared without ever being scheduled. Detach the batch atomically (`pending_watchers = []`) before iterating. - gateway/platforms/bluebubbles.py: normalize the salvaged _guid_cache LRU (#30523) to match feishu/codebase precedent — module-level `_GUID_CACHE_SIZE` constant, `while len > cap`, and drop the redundant post-insert `move_to_end` (a fresh insert is already most-recent). - gateway/platforms/feishu.py: drop the same redundant post-insert `move_to_end` from the salvaged _message_text_cache LRU (#23706). - scripts/release.py: add AUTHOR_MAP entries for the salvaged commits' authors (amathxbt #22155, ErnestHysa #32636/#32708) so the contributor audit passes when these commits land on main. - tests/tools/test_tool_output_limits.py: autouse fixture resets the new module-level limits cache between tests. - tests/gateway/test_feishu.py: hand-built adapter fixture seeded _message_text_cache as a plain dict; it's now an OrderedDict, so the fixture type had to match. --- gateway/platforms/bluebubbles.py | 9 ++++----- gateway/platforms/feishu.py | 1 - gateway/run.py | 11 +++++++++-- scripts/release.py | 3 +++ tests/gateway/test_feishu.py | 3 ++- tests/tools/test_tool_output_limits.py | 10 ++++++++++ 6 files changed, 28 insertions(+), 9 deletions(-) diff --git a/gateway/platforms/bluebubbles.py b/gateway/platforms/bluebubbles.py index f02adbbaa68..2fc4102b666 100644 --- a/gateway/platforms/bluebubbles.py +++ b/gateway/platforms/bluebubbles.py @@ -61,6 +61,8 @@ _MESSAGE_EVENTS = {"new-message", "message", "updated-message"} _PHONE_RE = re.compile(r"\+?\d{7,15}") _EMAIL_RE = re.compile(r"[\w.+-]+@[\w-]+\.[\w.]+") +_GUID_CACHE_SIZE = 500 # LRU cap for resolved chat-GUID lookups + def _redact(text: str) -> str: """Redact phone numbers and emails from log output.""" @@ -130,7 +132,6 @@ class BlueBubblesAdapter(BasePlatformAdapter): self._private_api_enabled: Optional[bool] = None self._helper_connected: bool = False self._guid_cache: OrderedDict[str, str] = OrderedDict() - self._GUID_CACHE_MAX = 500 # ------------------------------------------------------------------ # API helpers @@ -380,15 +381,13 @@ class BlueBubblesAdapter(BasePlatformAdapter): if identifier == target: if guid: self._guid_cache[target] = guid - self._guid_cache.move_to_end(target) - if len(self._guid_cache) > self._GUID_CACHE_MAX: + while len(self._guid_cache) > _GUID_CACHE_SIZE: self._guid_cache.popitem(last=False) return guid for part in chat.get("participants", []) or []: if (part.get("address") or "").strip() == target and guid: self._guid_cache[target] = guid - self._guid_cache.move_to_end(target) - if len(self._guid_cache) > self._GUID_CACHE_MAX: + while len(self._guid_cache) > _GUID_CACHE_SIZE: self._guid_cache.popitem(last=False) return guid except Exception: diff --git a/gateway/platforms/feishu.py b/gateway/platforms/feishu.py index 2366b65ac74..12ad62b5a7e 100644 --- a/gateway/platforms/feishu.py +++ b/gateway/platforms/feishu.py @@ -3982,7 +3982,6 @@ class FeishuAdapter(BasePlatformAdapter): mentions=parent_mentions, ) self._message_text_cache[message_id] = text - self._message_text_cache.move_to_end(message_id) while len(self._message_text_cache) > _FEISHU_MESSAGE_TEXT_CACHE_SIZE: self._message_text_cache.popitem(last=False) return text diff --git a/gateway/run.py b/gateway/run.py index cf30b6ffb95..55d1131f10e 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -4465,7 +4465,12 @@ class GatewayRunner: # Drain any recovered process watchers (from crash recovery checkpoint) try: from tools.process_registry import process_registry + # Detach the current batch atomically: reassigning to a fresh list + # takes ownership of exactly the watchers present now, so any watcher + # appended concurrently during the yield below isn't silently dropped + # by a clear() on the shared list. watchers = process_registry.pending_watchers + process_registry.pending_watchers = [] # Process in batches of 100 with event-loop yield points to avoid # O(n^2) event-loop blocking when recovering thousands of watchers. for i, watcher in enumerate(watchers): @@ -4473,7 +4478,6 @@ class GatewayRunner: logger.info("Resumed watcher for recovered process %s", watcher.get("session_id")) if i % 100 == 99: await asyncio.sleep(0) - watchers.clear() except Exception as e: logger.error("Recovered watcher setup error: %s", e) @@ -9166,12 +9170,15 @@ class GatewayRunner: # Check for pending process watchers (check_interval on background processes) try: from tools.process_registry import process_registry + # Detach the current batch atomically (see crash-recovery drain + # above): reassign to a fresh list so a watcher appended by a + # concurrent session during the yield isn't dropped by clear(). watchers = process_registry.pending_watchers + process_registry.pending_watchers = [] for i, watcher in enumerate(watchers): asyncio.create_task(self._run_process_watcher(watcher)) if i % 100 == 99: await asyncio.sleep(0) - watchers.clear() except Exception as e: logger.error("Process watcher setup error: %s", e) diff --git a/scripts/release.py b/scripts/release.py index 30d0d84d6a0..c5d8047d849 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -1406,6 +1406,9 @@ AUTHOR_MAP = { "peter.yuqin@gmail.com": "WuKongAI-CMU", # PR #10082 (reject symlinked audio inputs) "sunil.nitie@gmail.com": "Sunil123135", # PR #31031 (Windows Docker Desktop compose) "weichangyuwcy@gmail.com": "ChyuWei", # PR #30987 (TUI TTS env var on voice off) + # batch salvage PR #35758 (perf micro-fixes) + "116212274+amathxbt@users.noreply.github.com": "amathxbt", # PR #22155 (cache tool_output_limits) + "takis312@hotmail.com": "ErnestHysa", # PRs #32636/#32708 (MCP asyncio.sleep + O(n^2) watcher drain) } diff --git a/tests/gateway/test_feishu.py b/tests/gateway/test_feishu.py index 0f65fd052be..56770f55d76 100644 --- a/tests/gateway/test_feishu.py +++ b/tests/gateway/test_feishu.py @@ -6,6 +6,7 @@ import os import tempfile import time import unittest +from collections import OrderedDict from pathlib import Path from types import SimpleNamespace from typing import Dict @@ -4603,7 +4604,7 @@ class TestFeishuFetchMessageText(unittest.TestCase): adapter._bot_open_id = "ou_bot" adapter._bot_user_id = "" adapter._bot_name = "Hermes" - adapter._message_text_cache = {} + adapter._message_text_cache = OrderedDict() adapter._client = Mock() adapter._build_get_message_request = Mock(return_value=object()) return adapter diff --git a/tests/tools/test_tool_output_limits.py b/tests/tools/test_tool_output_limits.py index 19fa3fc05a1..b18f7f3ad0b 100644 --- a/tests/tools/test_tool_output_limits.py +++ b/tests/tools/test_tool_output_limits.py @@ -22,6 +22,16 @@ import pytest from tools import tool_output_limits as tol +@pytest.fixture(autouse=True) +def _reset_limits_cache(): + """get_tool_output_limits() now memoizes its result for the process + lifetime, so each test must start from a clean cache to observe the + config value it patches in.""" + tol._reset_tool_output_limits_cache() + yield + tol._reset_tool_output_limits_cache() + + class TestDefaults: def test_defaults_match_previous_hardcoded_values(self): assert tol.DEFAULT_MAX_BYTES == 50_000