From 4aa534eae5560a66cc2062c408680197ba01c0cc Mon Sep 17 00:00:00 2001 From: Teknium Date: Sun, 12 Apr 2026 01:55:31 -0700 Subject: [PATCH] fix(gateway): peek at pending message during interrupt instead of consuming it MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The monitor_for_interrupt() and backup interrupt checks were calling get_pending_message() which pops the message from the adapter's queue. This created a race condition: if the agent finished naturally before checking _interrupt_requested, the pending message was permanently lost. Timeline of the race: 1. Agent near completion, user sends message 2. Level 1 guard stores message in adapter._pending_messages, sets event 3. monitor_for_interrupt() detects event, POPS message, calls agent.interrupt() 4. Agent's run_conversation() was already returning (interrupted=False) 5. Post-run dequeue finds nothing (monitor already consumed it) 6. result.get('interrupted') is False so interrupt_message fallback doesn't fire 7. User message permanently lost — agent finishes without processing it Fix: change all three interrupt detection sites (primary monitor + two backup checks) from get_pending_message() (pop) to _pending_messages.get() (peek). The message stays in the adapter's queue until _dequeue_pending_event() consumes it in the post-run handler, which runs regardless of whether the agent was interrupted or finished naturally. Reported by @_SushantSays — intermittent message loss during long terminal command execution, persisting after the previous fix (73f970fa) which addressed monitor task death but not this consumption race. --- gateway/run.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index 9cf39a16050..d95d5eb1e1d 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -8046,8 +8046,16 @@ class GatewayRunner: if hasattr(_adapter, 'has_pending_interrupt') and _adapter.has_pending_interrupt(session_key): agent = agent_holder[0] if agent: - pending_event = _adapter.get_pending_message(session_key) - pending_text = pending_event.text if pending_event else None + # Peek at the pending message text WITHOUT consuming it. + # The message must remain in _pending_messages so the + # post-run dequeue at _dequeue_pending_event() can + # retrieve the full MessageEvent (with media metadata). + # If we pop here, a race exists: the agent may finish + # before checking _interrupt_requested, and the message + # is lost — neither the interrupt path nor the dequeue + # path finds it. + _peek_event = _adapter._pending_messages.get(session_key) + pending_text = _peek_event.text if _peek_event else None logger.debug("Interrupt detected from adapter, signaling agent...") agent.interrupt(pending_text) _interrupt_detected.set() @@ -8138,7 +8146,7 @@ class GatewayRunner: if (_backup_adapter and _backup_agent and hasattr(_backup_adapter, 'has_pending_interrupt') and _backup_adapter.has_pending_interrupt(session_key)): - _bp_event = _backup_adapter.get_pending_message(session_key) + _bp_event = _backup_adapter._pending_messages.get(session_key) _bp_text = _bp_event.text if _bp_event else None logger.info( "Backup interrupt detected for session %s " @@ -8198,7 +8206,7 @@ class GatewayRunner: if (_backup_adapter and _backup_agent and hasattr(_backup_adapter, 'has_pending_interrupt') and _backup_adapter.has_pending_interrupt(session_key)): - _bp_event = _backup_adapter.get_pending_message(session_key) + _bp_event = _backup_adapter._pending_messages.get(session_key) _bp_text = _bp_event.text if _bp_event else None logger.info( "Backup interrupt detected for session %s "