From 243ee67529ff58828ea1b4aac764be449472a4e7 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Thu, 26 Mar 2026 14:36:24 -0700 Subject: [PATCH] fix: store asyncio task references to prevent GC mid-execution (#3267) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Python's asyncio event loop holds only weak references to tasks. Without a strong reference, the garbage collector can destroy a task while it's awaiting I/O — silently dropping messages. Python 3.12+ made this more aggressive. Audit of all gateway platform adapters found 6 untracked create_task calls across 6 files: Per-message tasks (tracked via _background_tasks set from base class): - gateway/platforms/webhook.py: handle_message task - gateway/platforms/sms.py: handle_message task - gateway/platforms/signal.py: SSE response aclose task Long-running infrastructure tasks (stored in named instance vars): - gateway/platforms/slack.py: Socket Mode handler (_socket_mode_task) - gateway/platforms/discord.py: bot client (_bot_task) - gateway/platforms/whatsapp.py: message poll loop (_poll_task, 2 sites) All other adapters (telegram, mattermost, matrix, email, homeassistant, dingtalk) already tracked their tasks correctly. Salvaged from PR #3160 by memosr — expanded from 1 file to 6. --- gateway/platforms/discord.py | 3 ++- gateway/platforms/signal.py | 4 +++- gateway/platforms/slack.py | 3 ++- gateway/platforms/sms.py | 4 +++- gateway/platforms/webhook.py | 4 +++- gateway/platforms/whatsapp.py | 5 +++-- 6 files changed, 16 insertions(+), 7 deletions(-) diff --git a/gateway/platforms/discord.py b/gateway/platforms/discord.py index b1a441940..cb5bab1fa 100644 --- a/gateway/platforms/discord.py +++ b/gateway/platforms/discord.py @@ -446,6 +446,7 @@ class DiscordAdapter(BasePlatformAdapter): # Persistent typing indicator loops per channel (DMs don't reliably # show the standard typing gateway event for bots) self._typing_tasks: Dict[str, asyncio.Task] = {} + self._bot_task: Optional[asyncio.Task] = None # Cap to prevent unbounded growth (Discord threads get archived). self._MAX_TRACKED_THREADS = 500 @@ -588,7 +589,7 @@ class DiscordAdapter(BasePlatformAdapter): self._register_slash_commands() # Start the bot in background - asyncio.create_task(self._client.start(self.config.token)) + self._bot_task = asyncio.create_task(self._client.start(self.config.token)) # Wait for ready await asyncio.wait_for(self._ready_event.wait(), timeout=30) diff --git a/gateway/platforms/signal.py b/gateway/platforms/signal.py index 79ccb551b..39c3814fb 100644 --- a/gateway/platforms/signal.py +++ b/gateway/platforms/signal.py @@ -344,7 +344,9 @@ class SignalAdapter(BasePlatformAdapter): """Force SSE reconnection by closing the current response.""" if self._sse_response and not self._sse_response.is_stream_consumed: try: - asyncio.create_task(self._sse_response.aclose()) + task = asyncio.create_task(self._sse_response.aclose()) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) except Exception: pass self._sse_response = None diff --git a/gateway/platforms/slack.py b/gateway/platforms/slack.py index 8746a34d8..e8163e26e 100644 --- a/gateway/platforms/slack.py +++ b/gateway/platforms/slack.py @@ -72,6 +72,7 @@ class SlackAdapter(BasePlatformAdapter): self._handler: Optional[AsyncSocketModeHandler] = None self._bot_user_id: Optional[str] = None self._user_name_cache: Dict[str, str] = {} # user_id → display name + self._socket_mode_task: Optional[asyncio.Task] = None async def connect(self) -> bool: """Connect to Slack via Socket Mode.""" @@ -119,7 +120,7 @@ class SlackAdapter(BasePlatformAdapter): # Start Socket Mode handler in background self._handler = AsyncSocketModeHandler(self._app, app_token) - asyncio.create_task(self._handler.start_async()) + self._socket_mode_task = asyncio.create_task(self._handler.start_async()) self._running = True logger.info("[Slack] Connected as @%s (Socket Mode)", bot_name) diff --git a/gateway/platforms/sms.py b/gateway/platforms/sms.py index 750821a4c..a0760199b 100644 --- a/gateway/platforms/sms.py +++ b/gateway/platforms/sms.py @@ -265,7 +265,9 @@ class SmsAdapter(BasePlatformAdapter): ) # Non-blocking: Twilio expects a fast response - asyncio.create_task(self.handle_message(event)) + task = asyncio.create_task(self.handle_message(event)) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) # Return empty TwiML — we send replies via the REST API, not inline TwiML return web.Response( diff --git a/gateway/platforms/webhook.py b/gateway/platforms/webhook.py index 4a4bbfbac..2d75879b5 100644 --- a/gateway/platforms/webhook.py +++ b/gateway/platforms/webhook.py @@ -363,7 +363,9 @@ class WebhookAdapter(BasePlatformAdapter): ) # Non-blocking — return 202 Accepted immediately - asyncio.create_task(self.handle_message(event)) + task = asyncio.create_task(self.handle_message(event)) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) return web.json_response( { diff --git a/gateway/platforms/whatsapp.py b/gateway/platforms/whatsapp.py index 8e3010b43..b83657401 100644 --- a/gateway/platforms/whatsapp.py +++ b/gateway/platforms/whatsapp.py @@ -140,6 +140,7 @@ class WhatsAppAdapter(BasePlatformAdapter): self._message_queue: asyncio.Queue = asyncio.Queue() self._bridge_log_fh = None self._bridge_log: Optional[Path] = None + self._poll_task: Optional[asyncio.Task] = None async def connect(self) -> bool: """ @@ -198,7 +199,7 @@ class WhatsAppAdapter(BasePlatformAdapter): print(f"[{self.name}] Using existing bridge (status: {bridge_status})") self._mark_connected() self._bridge_process = None # Not managed by us - asyncio.create_task(self._poll_messages()) + self._poll_task = asyncio.create_task(self._poll_messages()) return True else: print(f"[{self.name}] Bridge found but not connected (status: {bridge_status}), restarting") @@ -304,7 +305,7 @@ class WhatsAppAdapter(BasePlatformAdapter): print(f"[{self.name}] If session expired, re-pair: hermes whatsapp") # Start message polling task - asyncio.create_task(self._poll_messages()) + self._poll_task = asyncio.create_task(self._poll_messages()) self._mark_connected() print(f"[{self.name}] Bridge started on port {self._bridge_port}")