mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(discord): shield text-batch flush from follow-up cancel (#12444)
When Discord splits a long message at 2000 chars, _enqueue_text_event buffers each chunk and schedules a _flush_text_batch task with a short delay. If another chunk lands while the prior flush task is already inside handle_message, _enqueue_text_event calls prior_task.cancel() — and without asyncio.shield, CancelledError propagates from the flush task into handle_message → the agent's streaming request, aborting the response the user was waiting on. Reproducer: user sends a 3000-char prompt (split by Discord into 2 messages). Chunk 1 lands, flush delay starts, chunk 2 lands during the brief window when chunk 1's flush has already committed to handle_message. Agent's current streaming response is cancelled with CancelledError, user sees a truncated or missing reply. Fix (gateway/platforms/discord.py): - Wrap the handle_message call in asyncio.shield so the inner dispatch is protected from the outer task's cancel. - Add an except asyncio.CancelledError clause so the outer task still exits cleanly when cancel lands during the sleep window (before the pop) — semantics for that path are unchanged. The new flush task spawned by the follow-up chunk still handles its own batch via the normal pending-message / active-session machinery in base.py, so follow-ups are not lost. Tests: tests/gateway/test_text_batching.py — test_shield_protects_handle_message_from_cancel. Tracks a distinct first_handle_cancelled event so the assertion fails cleanly when the shield is missing (verified by stashing the fix and re-running). Live E2E on the live-loaded DiscordAdapter: first_handle_cancelled: False (shield worked) first_handle_completed: True (handle_message ran to completion)
This commit is contained in:
parent
dca439fe92
commit
7c10761dd2
2 changed files with 78 additions and 1 deletions
|
|
@ -3265,7 +3265,20 @@ class DiscordAdapter(BasePlatformAdapter):
|
||||||
"[Discord] Flushing text batch %s (%d chars)",
|
"[Discord] Flushing text batch %s (%d chars)",
|
||||||
key, len(event.text or ""),
|
key, len(event.text or ""),
|
||||||
)
|
)
|
||||||
await self.handle_message(event)
|
# Shield the downstream dispatch so that a subsequent chunk
|
||||||
|
# arriving while handle_message is mid-flight cannot cancel
|
||||||
|
# the running agent turn. _enqueue_text_event always cancels
|
||||||
|
# the prior flush task when a new chunk lands; without this
|
||||||
|
# shield, CancelledError would propagate from our task down
|
||||||
|
# into handle_message → the agent's streaming request,
|
||||||
|
# aborting the response the user was waiting on. The new
|
||||||
|
# chunk is handled by the fresh flush task regardless.
|
||||||
|
await asyncio.shield(self.handle_message(event))
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
# Only reached if cancel landed before the pop — the shielded
|
||||||
|
# handle_message is unaffected either way. Let the task exit
|
||||||
|
# cleanly so the finally block cleans up.
|
||||||
|
pass
|
||||||
finally:
|
finally:
|
||||||
if self._pending_text_batch_tasks.get(key) is current_task:
|
if self._pending_text_batch_tasks.get(key) is current_task:
|
||||||
self._pending_text_batch_tasks.pop(key, None)
|
self._pending_text_batch_tasks.pop(key, None)
|
||||||
|
|
|
||||||
|
|
@ -148,6 +148,70 @@ class TestDiscordTextBatching:
|
||||||
await asyncio.sleep(0.25)
|
await asyncio.sleep(0.25)
|
||||||
adapter.handle_message.assert_called_once()
|
adapter.handle_message.assert_called_once()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_shield_protects_handle_message_from_cancel(self):
|
||||||
|
"""Regression guard: a follow-up chunk arriving while
|
||||||
|
handle_message is mid-flight must NOT cancel the running
|
||||||
|
dispatch. _enqueue_text_event fires prior_task.cancel() on
|
||||||
|
every new chunk; without asyncio.shield around handle_message
|
||||||
|
the cancel propagates into the agent's streaming request and
|
||||||
|
aborts the response.
|
||||||
|
"""
|
||||||
|
adapter = _make_discord_adapter()
|
||||||
|
|
||||||
|
handle_started = asyncio.Event()
|
||||||
|
release_handle = asyncio.Event()
|
||||||
|
first_handle_cancelled = asyncio.Event()
|
||||||
|
first_handle_completed = asyncio.Event()
|
||||||
|
call_count = [0]
|
||||||
|
|
||||||
|
async def slow_handle(event):
|
||||||
|
call_count[0] += 1
|
||||||
|
# Only the first call (batch 1) is the one we're protecting.
|
||||||
|
if call_count[0] == 1:
|
||||||
|
handle_started.set()
|
||||||
|
try:
|
||||||
|
await release_handle.wait()
|
||||||
|
first_handle_completed.set()
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
first_handle_cancelled.set()
|
||||||
|
raise
|
||||||
|
# Second call (batch 2) returns immediately — not the subject
|
||||||
|
# of this test.
|
||||||
|
|
||||||
|
adapter.handle_message = slow_handle
|
||||||
|
|
||||||
|
# Prime batch 1 and wait for it to land inside handle_message.
|
||||||
|
adapter._enqueue_text_event(_make_event("batch 1", Platform.DISCORD))
|
||||||
|
await asyncio.wait_for(handle_started.wait(), timeout=1.0)
|
||||||
|
|
||||||
|
# A new chunk arrives — _enqueue_text_event fires
|
||||||
|
# prior_task.cancel() on batch 1's flush task, which is
|
||||||
|
# currently awaiting inside handle_message.
|
||||||
|
adapter._enqueue_text_event(_make_event("batch 2 follow-up", Platform.DISCORD))
|
||||||
|
|
||||||
|
# Let the cancel propagate.
|
||||||
|
await asyncio.sleep(0.05)
|
||||||
|
|
||||||
|
# CRITICAL ASSERTION: batch 1's handle_message must NOT have
|
||||||
|
# been cancelled. Without asyncio.shield this assertion fails
|
||||||
|
# because CancelledError propagates from the flush task's
|
||||||
|
# `await self.handle_message(event)` into slow_handle.
|
||||||
|
assert not first_handle_cancelled.is_set(), (
|
||||||
|
"handle_message for batch 1 was cancelled by a follow-up "
|
||||||
|
"chunk — asyncio.shield is missing or broken"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Release batch 1's handle_message and let it complete.
|
||||||
|
release_handle.set()
|
||||||
|
await asyncio.wait_for(first_handle_completed.wait(), timeout=1.0)
|
||||||
|
assert first_handle_completed.is_set()
|
||||||
|
|
||||||
|
# Cleanup
|
||||||
|
for task in list(adapter._pending_text_batch_tasks.values()):
|
||||||
|
task.cancel()
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
|
||||||
|
|
||||||
# =====================================================================
|
# =====================================================================
|
||||||
# Matrix text batching
|
# Matrix text batching
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue