diff --git a/gateway/run.py b/gateway/run.py index 1dcb87d85..a5876bc14 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -8010,26 +8010,43 @@ class GatewayRunner: tracking_task = asyncio.create_task(track_agent()) - # Monitor for interrupts from the adapter (new messages arriving) + # Monitor for interrupts from the adapter (new messages arriving). + # This is the PRIMARY interrupt path for regular text messages — + # Level 1 (base.py) catches them before _handle_message() is reached, + # so the Level 2 running_agent.interrupt() path never fires. + # The inactivity poll loop below has a BACKUP check in case this + # task dies (no error handling = silent death = lost interrupts). + _interrupt_detected = asyncio.Event() # shared with backup check + async def monitor_for_interrupt(): - adapter = self.adapters.get(source.platform) - if not adapter or not session_key: + if not session_key: return - + while True: await asyncio.sleep(0.2) # Check every 200ms - # Check if adapter has a pending interrupt for this session. - # Must use session_key (build_session_key output) — NOT - # source.chat_id — because the adapter stores interrupt events - # under the full session key. - if hasattr(adapter, 'has_pending_interrupt') and adapter.has_pending_interrupt(session_key): - agent = agent_holder[0] - if agent: - pending_event = adapter.get_pending_message(session_key) - pending_text = pending_event.text if pending_event else None - logger.debug("Interrupt detected from adapter, signaling agent...") - agent.interrupt(pending_text) - break + try: + # Re-resolve adapter each iteration so reconnects don't + # leave us holding a stale reference. + _adapter = self.adapters.get(source.platform) + if not _adapter: + continue + # Check if adapter has a pending interrupt for this session. + # Must use session_key (build_session_key output) — NOT + # source.chat_id — because the adapter stores interrupt events + # under the full session key. + if hasattr(_adapter, 'has_pending_interrupt') and _adapter.has_pending_interrupt(session_key): + agent = agent_holder[0] + if agent: + pending_event = _adapter.get_pending_message(session_key) + pending_text = pending_event.text if pending_event else None + logger.debug("Interrupt detected from adapter, signaling agent...") + agent.interrupt(pending_text) + _interrupt_detected.set() + break + except asyncio.CancelledError: + raise + except Exception as _mon_err: + logger.debug("monitor_for_interrupt error (will retry): %s", _mon_err) interrupt_monitor = asyncio.create_task(monitor_for_interrupt()) @@ -8094,8 +8111,34 @@ class GatewayRunner: _POLL_INTERVAL = 5.0 if _agent_timeout is None: - # Unlimited — just await the result. - response = await _executor_task + # Unlimited — still poll periodically for backup interrupt + # detection in case monitor_for_interrupt() silently died. + response = None + while True: + done, _ = await asyncio.wait( + {_executor_task}, timeout=_POLL_INTERVAL + ) + if done: + response = _executor_task.result() + break + # Backup interrupt check: if the monitor task died or + # missed the interrupt, catch it here. + if not _interrupt_detected.is_set() and session_key: + _backup_adapter = self.adapters.get(source.platform) + _backup_agent = agent_holder[0] + if (_backup_adapter and _backup_agent + and hasattr(_backup_adapter, 'has_pending_interrupt') + and _backup_adapter.has_pending_interrupt(session_key)): + _bp_event = _backup_adapter.get_pending_message(session_key) + _bp_text = _bp_event.text if _bp_event else None + logger.info( + "Backup interrupt detected for session %s " + "(monitor task state: %s)", + session_key[:20], + "done" if interrupt_monitor.done() else "running", + ) + _backup_agent.interrupt(_bp_text) + _interrupt_detected.set() else: # Poll loop: check the agent's built-in activity tracker # (updated by _touch_activity() on every tool call, API @@ -8139,6 +8182,23 @@ class GatewayRunner: if _idle_secs >= _agent_timeout: _inactivity_timeout = True break + # Backup interrupt check (same as unlimited path). + if not _interrupt_detected.is_set() and session_key: + _backup_adapter = self.adapters.get(source.platform) + _backup_agent = agent_holder[0] + if (_backup_adapter and _backup_agent + and hasattr(_backup_adapter, 'has_pending_interrupt') + and _backup_adapter.has_pending_interrupt(session_key)): + _bp_event = _backup_adapter.get_pending_message(session_key) + _bp_text = _bp_event.text if _bp_event else None + logger.info( + "Backup interrupt detected for session %s " + "(monitor task state: %s)", + session_key[:20], + "done" if interrupt_monitor.done() else "running", + ) + _backup_agent.interrupt(_bp_text) + _interrupt_detected.set() if _inactivity_timeout: # Build a diagnostic summary from the agent's activity tracker.