mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix: make gateway interrupt detection resilient to monitor task failures
The interrupt mechanism for regular text messages (non-commands) during active agent runs relied on a single async polling task (monitor_for_interrupt) with no error handling. If this task died silently due to an unhandled exception, stale adapter reference after reconnect, or any other failure, user messages sent during agent execution would be queued but never trigger an actual interrupt — the agent would continue running until it finished naturally, then process the queued message. Three improvements: 1. Error handling in monitor_for_interrupt(): wrap the polling body in try/except so transient errors are logged and retried instead of silently killing the task. 2. Fresh adapter reference on each poll iteration: re-resolve self.adapters.get(source.platform) every 200ms instead of capturing the adapter once at task creation time. This prevents stale references after adapter reconnects. 3. Backup interrupt check in the inactivity poll loop: both the unlimited and timeout-enabled paths now check for pending interrupts every 5 seconds (the existing poll interval). Uses a shared _interrupt_detected asyncio.Event to avoid double-firing when the primary monitor already handled the interrupt. Logs at INFO level with monitor task state for debugging.
This commit is contained in:
parent
4cadfef8e3
commit
73f970fa4d
1 changed files with 78 additions and 18 deletions
|
|
@ -8010,26 +8010,43 @@ class GatewayRunner:
|
|||
|
||||
tracking_task = asyncio.create_task(track_agent())
|
||||
|
||||
# Monitor for interrupts from the adapter (new messages arriving)
|
||||
# Monitor for interrupts from the adapter (new messages arriving).
|
||||
# This is the PRIMARY interrupt path for regular text messages —
|
||||
# Level 1 (base.py) catches them before _handle_message() is reached,
|
||||
# so the Level 2 running_agent.interrupt() path never fires.
|
||||
# The inactivity poll loop below has a BACKUP check in case this
|
||||
# task dies (no error handling = silent death = lost interrupts).
|
||||
_interrupt_detected = asyncio.Event() # shared with backup check
|
||||
|
||||
async def monitor_for_interrupt():
|
||||
adapter = self.adapters.get(source.platform)
|
||||
if not adapter or not session_key:
|
||||
if not session_key:
|
||||
return
|
||||
|
||||
|
||||
while True:
|
||||
await asyncio.sleep(0.2) # Check every 200ms
|
||||
# Check if adapter has a pending interrupt for this session.
|
||||
# Must use session_key (build_session_key output) — NOT
|
||||
# source.chat_id — because the adapter stores interrupt events
|
||||
# under the full session key.
|
||||
if hasattr(adapter, 'has_pending_interrupt') and adapter.has_pending_interrupt(session_key):
|
||||
agent = agent_holder[0]
|
||||
if agent:
|
||||
pending_event = adapter.get_pending_message(session_key)
|
||||
pending_text = pending_event.text if pending_event else None
|
||||
logger.debug("Interrupt detected from adapter, signaling agent...")
|
||||
agent.interrupt(pending_text)
|
||||
break
|
||||
try:
|
||||
# Re-resolve adapter each iteration so reconnects don't
|
||||
# leave us holding a stale reference.
|
||||
_adapter = self.adapters.get(source.platform)
|
||||
if not _adapter:
|
||||
continue
|
||||
# Check if adapter has a pending interrupt for this session.
|
||||
# Must use session_key (build_session_key output) — NOT
|
||||
# source.chat_id — because the adapter stores interrupt events
|
||||
# under the full session key.
|
||||
if hasattr(_adapter, 'has_pending_interrupt') and _adapter.has_pending_interrupt(session_key):
|
||||
agent = agent_holder[0]
|
||||
if agent:
|
||||
pending_event = _adapter.get_pending_message(session_key)
|
||||
pending_text = pending_event.text if pending_event else None
|
||||
logger.debug("Interrupt detected from adapter, signaling agent...")
|
||||
agent.interrupt(pending_text)
|
||||
_interrupt_detected.set()
|
||||
break
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as _mon_err:
|
||||
logger.debug("monitor_for_interrupt error (will retry): %s", _mon_err)
|
||||
|
||||
interrupt_monitor = asyncio.create_task(monitor_for_interrupt())
|
||||
|
||||
|
|
@ -8094,8 +8111,34 @@ class GatewayRunner:
|
|||
_POLL_INTERVAL = 5.0
|
||||
|
||||
if _agent_timeout is None:
|
||||
# Unlimited — just await the result.
|
||||
response = await _executor_task
|
||||
# Unlimited — still poll periodically for backup interrupt
|
||||
# detection in case monitor_for_interrupt() silently died.
|
||||
response = None
|
||||
while True:
|
||||
done, _ = await asyncio.wait(
|
||||
{_executor_task}, timeout=_POLL_INTERVAL
|
||||
)
|
||||
if done:
|
||||
response = _executor_task.result()
|
||||
break
|
||||
# Backup interrupt check: if the monitor task died or
|
||||
# missed the interrupt, catch it here.
|
||||
if not _interrupt_detected.is_set() and session_key:
|
||||
_backup_adapter = self.adapters.get(source.platform)
|
||||
_backup_agent = agent_holder[0]
|
||||
if (_backup_adapter and _backup_agent
|
||||
and hasattr(_backup_adapter, 'has_pending_interrupt')
|
||||
and _backup_adapter.has_pending_interrupt(session_key)):
|
||||
_bp_event = _backup_adapter.get_pending_message(session_key)
|
||||
_bp_text = _bp_event.text if _bp_event else None
|
||||
logger.info(
|
||||
"Backup interrupt detected for session %s "
|
||||
"(monitor task state: %s)",
|
||||
session_key[:20],
|
||||
"done" if interrupt_monitor.done() else "running",
|
||||
)
|
||||
_backup_agent.interrupt(_bp_text)
|
||||
_interrupt_detected.set()
|
||||
else:
|
||||
# Poll loop: check the agent's built-in activity tracker
|
||||
# (updated by _touch_activity() on every tool call, API
|
||||
|
|
@ -8139,6 +8182,23 @@ class GatewayRunner:
|
|||
if _idle_secs >= _agent_timeout:
|
||||
_inactivity_timeout = True
|
||||
break
|
||||
# Backup interrupt check (same as unlimited path).
|
||||
if not _interrupt_detected.is_set() and session_key:
|
||||
_backup_adapter = self.adapters.get(source.platform)
|
||||
_backup_agent = agent_holder[0]
|
||||
if (_backup_adapter and _backup_agent
|
||||
and hasattr(_backup_adapter, 'has_pending_interrupt')
|
||||
and _backup_adapter.has_pending_interrupt(session_key)):
|
||||
_bp_event = _backup_adapter.get_pending_message(session_key)
|
||||
_bp_text = _bp_event.text if _bp_event else None
|
||||
logger.info(
|
||||
"Backup interrupt detected for session %s "
|
||||
"(monitor task state: %s)",
|
||||
session_key[:20],
|
||||
"done" if interrupt_monitor.done() else "running",
|
||||
)
|
||||
_backup_agent.interrupt(_bp_text)
|
||||
_interrupt_detected.set()
|
||||
|
||||
if _inactivity_timeout:
|
||||
# Build a diagnostic summary from the agent's activity tracker.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue