fix(gateway): peek at pending message during interrupt instead of consuming it

The monitor_for_interrupt() and backup interrupt checks were calling
get_pending_message() which pops the message from the adapter's queue.
This created a race condition: if the agent finished naturally before
checking _interrupt_requested, the pending message was permanently lost.

Timeline of the race:
1. Agent near completion, user sends message
2. Level 1 guard stores message in adapter._pending_messages, sets event
3. monitor_for_interrupt() detects event, POPS message, calls agent.interrupt()
4. Agent's run_conversation() was already returning (interrupted=False)
5. Post-run dequeue finds nothing (monitor already consumed it)
6. result.get('interrupted') is False so interrupt_message fallback doesn't fire
7. User message permanently lost — agent finishes without processing it

Fix: change all three interrupt detection sites (primary monitor + two
backup checks) from get_pending_message() (pop) to
_pending_messages.get() (peek). The message stays in the adapter's queue
until _dequeue_pending_event() consumes it in the post-run handler,
which runs regardless of whether the agent was interrupted or finished
naturally.

Reported by @_SushantSays — intermittent message loss during long
terminal command execution, persisting after the previous fix (73f970fa)
which addressed monitor task death but not this consumption race.
This commit is contained in:
Teknium 2026-04-12 01:55:31 -07:00 committed by Teknium
parent ae6820a45a
commit 4aa534eae5

View file

@ -8046,8 +8046,16 @@ class GatewayRunner:
if hasattr(_adapter, 'has_pending_interrupt') and _adapter.has_pending_interrupt(session_key):
agent = agent_holder[0]
if agent:
pending_event = _adapter.get_pending_message(session_key)
pending_text = pending_event.text if pending_event else None
# Peek at the pending message text WITHOUT consuming it.
# The message must remain in _pending_messages so the
# post-run dequeue at _dequeue_pending_event() can
# retrieve the full MessageEvent (with media metadata).
# If we pop here, a race exists: the agent may finish
# before checking _interrupt_requested, and the message
# is lost — neither the interrupt path nor the dequeue
# path finds it.
_peek_event = _adapter._pending_messages.get(session_key)
pending_text = _peek_event.text if _peek_event else None
logger.debug("Interrupt detected from adapter, signaling agent...")
agent.interrupt(pending_text)
_interrupt_detected.set()
@ -8138,7 +8146,7 @@ class GatewayRunner:
if (_backup_adapter and _backup_agent
and hasattr(_backup_adapter, 'has_pending_interrupt')
and _backup_adapter.has_pending_interrupt(session_key)):
_bp_event = _backup_adapter.get_pending_message(session_key)
_bp_event = _backup_adapter._pending_messages.get(session_key)
_bp_text = _bp_event.text if _bp_event else None
logger.info(
"Backup interrupt detected for session %s "
@ -8198,7 +8206,7 @@ class GatewayRunner:
if (_backup_adapter and _backup_agent
and hasattr(_backup_adapter, 'has_pending_interrupt')
and _backup_adapter.has_pending_interrupt(session_key)):
_bp_event = _backup_adapter.get_pending_message(session_key)
_bp_event = _backup_adapter._pending_messages.get(session_key)
_bp_text = _bp_event.text if _bp_event else None
logger.info(
"Backup interrupt detected for session %s "