diff --git a/gateway/run.py b/gateway/run.py index 6adb98b8e20..cf30b6ffb95 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -4465,10 +4465,15 @@ class GatewayRunner: # Drain any recovered process watchers (from crash recovery checkpoint) try: from tools.process_registry import process_registry - while process_registry.pending_watchers: - watcher = process_registry.pending_watchers.pop(0) + watchers = process_registry.pending_watchers + # Process in batches of 100 with event-loop yield points to avoid + # O(n^2) event-loop blocking when recovering thousands of watchers. + for i, watcher in enumerate(watchers): asyncio.create_task(self._run_process_watcher(watcher)) logger.info("Resumed watcher for recovered process %s", watcher.get("session_id")) + if i % 100 == 99: + await asyncio.sleep(0) + watchers.clear() except Exception as e: logger.error("Recovered watcher setup error: %s", e) @@ -7938,7 +7943,7 @@ class GatewayRunner: result = await result return str(result) if result else None except Exception as e: - logger.debug("Plugin command dispatch failed (non-fatal): %s", e) + logger.warning("Plugin command dispatch failed: %s", e) # Skill slash commands: /skill-name loads the skill and sends to agent. # resolve_skill_command_key() handles the Telegram underscore/hyphen @@ -7970,7 +7975,7 @@ class GatewayRunner: ) # Fall through to normal message processing with bundle content except Exception as exc: - logger.debug("Bundle dispatch failed (non-fatal): %s", exc) + logger.warning("Bundle dispatch failed: %s", exc) if command and not locals().get("_bundle_handled", False): try: @@ -9161,9 +9166,12 @@ class GatewayRunner: # Check for pending process watchers (check_interval on background processes) try: from tools.process_registry import process_registry - while process_registry.pending_watchers: - watcher = process_registry.pending_watchers.pop(0) + watchers = process_registry.pending_watchers + for i, watcher in enumerate(watchers): asyncio.create_task(self._run_process_watcher(watcher)) + if i % 100 == 99: + await asyncio.sleep(0) + watchers.clear() except Exception as e: logger.error("Process watcher setup error: %s", e)