mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix: store asyncio task references to prevent GC mid-execution (#3267)
Python's asyncio event loop holds only weak references to tasks. Without a strong reference, the garbage collector can destroy a task while it's awaiting I/O — silently dropping messages. Python 3.12+ made this more aggressive. Audit of all gateway platform adapters found 6 untracked create_task calls across 6 files: Per-message tasks (tracked via _background_tasks set from base class): - gateway/platforms/webhook.py: handle_message task - gateway/platforms/sms.py: handle_message task - gateway/platforms/signal.py: SSE response aclose task Long-running infrastructure tasks (stored in named instance vars): - gateway/platforms/slack.py: Socket Mode handler (_socket_mode_task) - gateway/platforms/discord.py: bot client (_bot_task) - gateway/platforms/whatsapp.py: message poll loop (_poll_task, 2 sites) All other adapters (telegram, mattermost, matrix, email, homeassistant, dingtalk) already tracked their tasks correctly. Salvaged from PR #3160 by memosr — expanded from 1 file to 6.
This commit is contained in:
parent
3a86328847
commit
243ee67529
6 changed files with 16 additions and 7 deletions
|
|
@ -446,6 +446,7 @@ class DiscordAdapter(BasePlatformAdapter):
|
|||
# Persistent typing indicator loops per channel (DMs don't reliably
|
||||
# show the standard typing gateway event for bots)
|
||||
self._typing_tasks: Dict[str, asyncio.Task] = {}
|
||||
self._bot_task: Optional[asyncio.Task] = None
|
||||
# Cap to prevent unbounded growth (Discord threads get archived).
|
||||
self._MAX_TRACKED_THREADS = 500
|
||||
|
||||
|
|
@ -588,7 +589,7 @@ class DiscordAdapter(BasePlatformAdapter):
|
|||
self._register_slash_commands()
|
||||
|
||||
# Start the bot in background
|
||||
asyncio.create_task(self._client.start(self.config.token))
|
||||
self._bot_task = asyncio.create_task(self._client.start(self.config.token))
|
||||
|
||||
# Wait for ready
|
||||
await asyncio.wait_for(self._ready_event.wait(), timeout=30)
|
||||
|
|
|
|||
|
|
@ -344,7 +344,9 @@ class SignalAdapter(BasePlatformAdapter):
|
|||
"""Force SSE reconnection by closing the current response."""
|
||||
if self._sse_response and not self._sse_response.is_stream_consumed:
|
||||
try:
|
||||
asyncio.create_task(self._sse_response.aclose())
|
||||
task = asyncio.create_task(self._sse_response.aclose())
|
||||
self._background_tasks.add(task)
|
||||
task.add_done_callback(self._background_tasks.discard)
|
||||
except Exception:
|
||||
pass
|
||||
self._sse_response = None
|
||||
|
|
|
|||
|
|
@ -72,6 +72,7 @@ class SlackAdapter(BasePlatformAdapter):
|
|||
self._handler: Optional[AsyncSocketModeHandler] = None
|
||||
self._bot_user_id: Optional[str] = None
|
||||
self._user_name_cache: Dict[str, str] = {} # user_id → display name
|
||||
self._socket_mode_task: Optional[asyncio.Task] = None
|
||||
|
||||
async def connect(self) -> bool:
|
||||
"""Connect to Slack via Socket Mode."""
|
||||
|
|
@ -119,7 +120,7 @@ class SlackAdapter(BasePlatformAdapter):
|
|||
|
||||
# Start Socket Mode handler in background
|
||||
self._handler = AsyncSocketModeHandler(self._app, app_token)
|
||||
asyncio.create_task(self._handler.start_async())
|
||||
self._socket_mode_task = asyncio.create_task(self._handler.start_async())
|
||||
|
||||
self._running = True
|
||||
logger.info("[Slack] Connected as @%s (Socket Mode)", bot_name)
|
||||
|
|
|
|||
|
|
@ -265,7 +265,9 @@ class SmsAdapter(BasePlatformAdapter):
|
|||
)
|
||||
|
||||
# Non-blocking: Twilio expects a fast response
|
||||
asyncio.create_task(self.handle_message(event))
|
||||
task = asyncio.create_task(self.handle_message(event))
|
||||
self._background_tasks.add(task)
|
||||
task.add_done_callback(self._background_tasks.discard)
|
||||
|
||||
# Return empty TwiML — we send replies via the REST API, not inline TwiML
|
||||
return web.Response(
|
||||
|
|
|
|||
|
|
@ -363,7 +363,9 @@ class WebhookAdapter(BasePlatformAdapter):
|
|||
)
|
||||
|
||||
# Non-blocking — return 202 Accepted immediately
|
||||
asyncio.create_task(self.handle_message(event))
|
||||
task = asyncio.create_task(self.handle_message(event))
|
||||
self._background_tasks.add(task)
|
||||
task.add_done_callback(self._background_tasks.discard)
|
||||
|
||||
return web.json_response(
|
||||
{
|
||||
|
|
|
|||
|
|
@ -140,6 +140,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
|||
self._message_queue: asyncio.Queue = asyncio.Queue()
|
||||
self._bridge_log_fh = None
|
||||
self._bridge_log: Optional[Path] = None
|
||||
self._poll_task: Optional[asyncio.Task] = None
|
||||
|
||||
async def connect(self) -> bool:
|
||||
"""
|
||||
|
|
@ -198,7 +199,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
|||
print(f"[{self.name}] Using existing bridge (status: {bridge_status})")
|
||||
self._mark_connected()
|
||||
self._bridge_process = None # Not managed by us
|
||||
asyncio.create_task(self._poll_messages())
|
||||
self._poll_task = asyncio.create_task(self._poll_messages())
|
||||
return True
|
||||
else:
|
||||
print(f"[{self.name}] Bridge found but not connected (status: {bridge_status}), restarting")
|
||||
|
|
@ -304,7 +305,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
|||
print(f"[{self.name}] If session expired, re-pair: hermes whatsapp")
|
||||
|
||||
# Start message polling task
|
||||
asyncio.create_task(self._poll_messages())
|
||||
self._poll_task = asyncio.create_task(self._poll_messages())
|
||||
|
||||
self._mark_connected()
|
||||
print(f"[{self.name}] Bridge started on port {self._bridge_port}")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue