fix(gateway): upgrade plugin/bundle error logging and fix O(n^2) watcher recovery

N43 — Silent plugin/bundle errors:
- Plugin command dispatch: logger.debug() -> logger.warning()
- Bundle dispatch: logger.debug() -> logger.warning()
Plugin/auth failures are no longer invisible to operators.

N42 — O(n^2) pending_watchers recovery:
- Both recovery loops (startup + per-message) used while+pop(0) which is O(n) per pop
- Replaced with enumerate() over the list + periodic asyncio.sleep(0) yield points
- Clears the list after iteration instead of per-pop
- Batch size of 100 balances throughput vs event-loop responsiveness
This commit is contained in:
ErnestHysa 2026-05-26 17:32:16 +01:00 committed by kshitij
parent eb9bfd3924
commit 0036c72923

View file

@ -4465,10 +4465,15 @@ class GatewayRunner:
# Drain any recovered process watchers (from crash recovery checkpoint)
try:
from tools.process_registry import process_registry
while process_registry.pending_watchers:
watcher = process_registry.pending_watchers.pop(0)
watchers = process_registry.pending_watchers
# Process in batches of 100 with event-loop yield points to avoid
# O(n^2) event-loop blocking when recovering thousands of watchers.
for i, watcher in enumerate(watchers):
asyncio.create_task(self._run_process_watcher(watcher))
logger.info("Resumed watcher for recovered process %s", watcher.get("session_id"))
if i % 100 == 99:
await asyncio.sleep(0)
watchers.clear()
except Exception as e:
logger.error("Recovered watcher setup error: %s", e)
@ -7938,7 +7943,7 @@ class GatewayRunner:
result = await result
return str(result) if result else None
except Exception as e:
logger.debug("Plugin command dispatch failed (non-fatal): %s", e)
logger.warning("Plugin command dispatch failed: %s", e)
# Skill slash commands: /skill-name loads the skill and sends to agent.
# resolve_skill_command_key() handles the Telegram underscore/hyphen
@ -7970,7 +7975,7 @@ class GatewayRunner:
)
# Fall through to normal message processing with bundle content
except Exception as exc:
logger.debug("Bundle dispatch failed (non-fatal): %s", exc)
logger.warning("Bundle dispatch failed: %s", exc)
if command and not locals().get("_bundle_handled", False):
try:
@ -9161,9 +9166,12 @@ class GatewayRunner:
# Check for pending process watchers (check_interval on background processes)
try:
from tools.process_registry import process_registry
while process_registry.pending_watchers:
watcher = process_registry.pending_watchers.pop(0)
watchers = process_registry.pending_watchers
for i, watcher in enumerate(watchers):
asyncio.create_task(self._run_process_watcher(watcher))
if i % 100 == 99:
await asyncio.sleep(0)
watchers.clear()
except Exception as e:
logger.error("Process watcher setup error: %s", e)