mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(approval): heartbeat activity during gateway approval wait (#11245)
The blocking gateway approval wait at tools/approval.py called `entry.event.wait(timeout=...)` which never touched the agent's activity tracker. When a user was slow to respond to a /approve prompt (or the gateway_timeout config was set higher than the default 300s), the agent thread sat silent long enough for the gateway's inactivity watchdog (agent.gateway_timeout, default 1800s) to kill it — even though the agent was doing exactly the right thing and the user was the one causing the delay. The fix polls the event in 1s slices and calls touch_activity_if_due between slices, mirroring the _wait_for_process() pattern in tools/environments/base.py that covers the subprocess-waiting side of the same problem. At the default 10s heartbeat cadence, a 300s approval wait now pings activity ~30 times, well under the 1800s idle threshold. Observed in community user logs: 12 repeated 'Agent idle 1800s, last_activity=executing tool: terminal' events across April 12-14. Companion to PR #10501 which covered streaming / concurrent-tool / Modal-backend gaps but did not touch approval.py. Test: tests/tools/test_approval_heartbeat.py — verifies (1) heartbeats fire during the wait, (2) user responses are still near-instant, and (3) the approval path stays functional when the heartbeat helper can't be imported.
This commit is contained in:
parent
f6179c5d5f
commit
387aa9afc9
2 changed files with 233 additions and 2 deletions
200
tests/tools/test_approval_heartbeat.py
Normal file
200
tests/tools/test_approval_heartbeat.py
Normal file
|
|
@ -0,0 +1,200 @@
|
|||
"""Tests for the activity-heartbeat behavior of the blocking gateway approval wait.
|
||||
|
||||
Regression test for false gateway inactivity timeouts firing while the agent
|
||||
is legitimately blocked waiting for a user to respond to a dangerous-command
|
||||
approval prompt. Before the fix, ``entry.event.wait(timeout=...)`` blocked
|
||||
silently — no ``_touch_activity()`` calls — and the gateway's inactivity
|
||||
watchdog (``agent.gateway_timeout``, default 1800s) would kill the agent
|
||||
while the user was still choosing whether to approve.
|
||||
|
||||
The fix polls the event in short slices and fires ``touch_activity_if_due``
|
||||
between slices, mirroring ``_wait_for_process`` in ``tools/environments/base.py``.
|
||||
"""
|
||||
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
def _clear_approval_state():
|
||||
"""Reset all module-level approval state between tests."""
|
||||
from tools import approval as mod
|
||||
mod._gateway_queues.clear()
|
||||
mod._gateway_notify_cbs.clear()
|
||||
mod._session_approved.clear()
|
||||
mod._permanent_approved.clear()
|
||||
mod._pending.clear()
|
||||
|
||||
|
||||
class TestApprovalHeartbeat:
|
||||
"""The blocking gateway approval wait must fire activity heartbeats.
|
||||
|
||||
Without heartbeats, the gateway's inactivity watchdog kills the agent
|
||||
thread while it's legitimately waiting for a slow user to respond to
|
||||
an approval prompt (observed in real user logs: MRB, April 2026).
|
||||
"""
|
||||
|
||||
SESSION_KEY = "heartbeat-test-session"
|
||||
|
||||
def setup_method(self):
|
||||
_clear_approval_state()
|
||||
self._saved_env = {
|
||||
k: os.environ.get(k)
|
||||
for k in ("HERMES_GATEWAY_SESSION", "HERMES_YOLO_MODE",
|
||||
"HERMES_SESSION_KEY")
|
||||
}
|
||||
os.environ.pop("HERMES_YOLO_MODE", None)
|
||||
os.environ["HERMES_GATEWAY_SESSION"] = "1"
|
||||
# The blocking wait path reads the session key via contextvar OR
|
||||
# os.environ fallback. Contextvars don't propagate across threads
|
||||
# by default, so env var is the portable way to drive this in tests.
|
||||
os.environ["HERMES_SESSION_KEY"] = self.SESSION_KEY
|
||||
|
||||
def teardown_method(self):
|
||||
for k, v in self._saved_env.items():
|
||||
if v is None:
|
||||
os.environ.pop(k, None)
|
||||
else:
|
||||
os.environ[k] = v
|
||||
_clear_approval_state()
|
||||
|
||||
def test_heartbeat_fires_while_waiting_for_approval(self):
|
||||
"""touch_activity_if_due is called repeatedly during the wait."""
|
||||
from tools.approval import (
|
||||
check_all_command_guards,
|
||||
register_gateway_notify,
|
||||
resolve_gateway_approval,
|
||||
)
|
||||
|
||||
register_gateway_notify(self.SESSION_KEY, lambda _payload: None)
|
||||
|
||||
# Use an Event to signal from _fake_touch back to the main thread
|
||||
# so we can resolve as soon as the first heartbeat fires — avoids
|
||||
# flakiness from fixed sleeps racing against thread startup.
|
||||
first_heartbeat = threading.Event()
|
||||
heartbeat_calls: list[str] = []
|
||||
|
||||
def _fake_touch(state, label):
|
||||
# Bypass the 10s throttle so the heartbeat fires every loop
|
||||
# iteration; we're measuring whether the call happens at all.
|
||||
heartbeat_calls.append(label)
|
||||
state["last_touch"] = 0.0
|
||||
first_heartbeat.set()
|
||||
|
||||
result_holder: dict = {}
|
||||
|
||||
def _run_check():
|
||||
try:
|
||||
with patch(
|
||||
"tools.environments.base.touch_activity_if_due",
|
||||
side_effect=_fake_touch,
|
||||
):
|
||||
result_holder["result"] = check_all_command_guards(
|
||||
"rm -rf /tmp/nonexistent-heartbeat-target", "local"
|
||||
)
|
||||
except Exception as exc: # pragma: no cover
|
||||
result_holder["exc"] = exc
|
||||
|
||||
thread = threading.Thread(target=_run_check, daemon=True)
|
||||
thread.start()
|
||||
|
||||
# Wait for at least one heartbeat to fire — bounded at 10s to catch
|
||||
# a genuinely hung worker thread without making a green run slow.
|
||||
assert first_heartbeat.wait(timeout=10.0), (
|
||||
"no heartbeat fired within 10s — the approval wait is blocking "
|
||||
"without firing activity pings, which is the exact bug this "
|
||||
"test exists to catch"
|
||||
)
|
||||
|
||||
# Resolve the approval so the thread exits cleanly.
|
||||
resolve_gateway_approval(self.SESSION_KEY, "once")
|
||||
thread.join(timeout=5)
|
||||
|
||||
assert not thread.is_alive(), "approval wait did not exit after resolve"
|
||||
assert "exc" not in result_holder, (
|
||||
f"check_all_command_guards raised: {result_holder.get('exc')!r}"
|
||||
)
|
||||
|
||||
# The fix: heartbeats fire while waiting. Before the fix this list
|
||||
# was empty because event.wait() blocked for the full timeout with
|
||||
# no activity pings.
|
||||
assert heartbeat_calls, "expected at least one heartbeat"
|
||||
assert all(
|
||||
call == "waiting for user approval" for call in heartbeat_calls
|
||||
), f"unexpected heartbeat labels: {set(heartbeat_calls)}"
|
||||
|
||||
# Sanity: the approval was resolved with "once" → command approved.
|
||||
assert result_holder["result"]["approved"] is True
|
||||
|
||||
def test_wait_returns_immediately_on_user_response(self):
|
||||
"""Polling slices don't delay responsiveness — resolve is near-instant."""
|
||||
from tools.approval import (
|
||||
check_all_command_guards,
|
||||
register_gateway_notify,
|
||||
resolve_gateway_approval,
|
||||
)
|
||||
|
||||
register_gateway_notify(self.SESSION_KEY, lambda _payload: None)
|
||||
|
||||
start_time = time.monotonic()
|
||||
result_holder: dict = {}
|
||||
|
||||
def _run_check():
|
||||
result_holder["result"] = check_all_command_guards(
|
||||
"rm -rf /tmp/nonexistent-fast-target", "local"
|
||||
)
|
||||
|
||||
thread = threading.Thread(target=_run_check, daemon=True)
|
||||
thread.start()
|
||||
|
||||
# Resolve almost immediately — the wait loop should return within
|
||||
# its current 1s poll slice.
|
||||
time.sleep(0.1)
|
||||
resolve_gateway_approval(self.SESSION_KEY, "once")
|
||||
thread.join(timeout=5)
|
||||
elapsed = time.monotonic() - start_time
|
||||
|
||||
assert not thread.is_alive()
|
||||
assert result_holder["result"]["approved"] is True
|
||||
# Generous bound to tolerate CI load; the previous single-wait
|
||||
# impl returned in <10ms, the polling impl is bounded by the 1s
|
||||
# slice length.
|
||||
assert elapsed < 3.0, f"resolution took {elapsed:.2f}s, expected <3s"
|
||||
|
||||
def test_heartbeat_import_failure_does_not_break_wait(self):
|
||||
"""If tools.environments.base can't be imported, the wait still works."""
|
||||
from tools.approval import (
|
||||
check_all_command_guards,
|
||||
register_gateway_notify,
|
||||
resolve_gateway_approval,
|
||||
)
|
||||
|
||||
register_gateway_notify(self.SESSION_KEY, lambda _payload: None)
|
||||
|
||||
result_holder: dict = {}
|
||||
import builtins
|
||||
real_import = builtins.__import__
|
||||
|
||||
def _fail_environments_base(name, *args, **kwargs):
|
||||
if name == "tools.environments.base":
|
||||
raise ImportError("simulated")
|
||||
return real_import(name, *args, **kwargs)
|
||||
|
||||
def _run_check():
|
||||
with patch.object(builtins, "__import__",
|
||||
side_effect=_fail_environments_base):
|
||||
result_holder["result"] = check_all_command_guards(
|
||||
"rm -rf /tmp/nonexistent-import-fail-target", "local"
|
||||
)
|
||||
|
||||
thread = threading.Thread(target=_run_check, daemon=True)
|
||||
thread.start()
|
||||
|
||||
time.sleep(0.2)
|
||||
resolve_gateway_approval(self.SESSION_KEY, "once")
|
||||
thread.join(timeout=5)
|
||||
|
||||
assert not thread.is_alive()
|
||||
# Even when heartbeat import fails, the approval flow completes.
|
||||
assert result_holder["result"]["approved"] is True
|
||||
|
|
@ -14,6 +14,7 @@ import os
|
|||
import re
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import unicodedata
|
||||
from typing import Optional
|
||||
|
||||
|
|
@ -834,13 +835,43 @@ def check_all_command_guards(command: str, env_type: str,
|
|||
"description": combined_desc,
|
||||
}
|
||||
|
||||
# Block until the user responds or timeout (default 5 min)
|
||||
# Block until the user responds or timeout (default 5 min).
|
||||
# Poll in short slices so we can fire activity heartbeats every
|
||||
# ~10s to the agent's inactivity tracker. Without this, the
|
||||
# blocking event.wait() never touches activity, and the
|
||||
# gateway's inactivity watchdog (agent.gateway_timeout, default
|
||||
# 1800s) kills the agent while the user is still responding to
|
||||
# the approval prompt. Mirrors the _wait_for_process() cadence
|
||||
# in tools/environments/base.py.
|
||||
timeout = _get_approval_config().get("gateway_timeout", 300)
|
||||
try:
|
||||
timeout = int(timeout)
|
||||
except (ValueError, TypeError):
|
||||
timeout = 300
|
||||
resolved = entry.event.wait(timeout=timeout)
|
||||
|
||||
try:
|
||||
from tools.environments.base import touch_activity_if_due
|
||||
except Exception: # pragma: no cover
|
||||
touch_activity_if_due = None
|
||||
|
||||
_now = time.monotonic()
|
||||
_deadline = _now + max(timeout, 0)
|
||||
_activity_state = {"last_touch": _now, "start": _now}
|
||||
resolved = False
|
||||
while True:
|
||||
_remaining = _deadline - time.monotonic()
|
||||
if _remaining <= 0:
|
||||
break
|
||||
# 1s poll slice — the event is set immediately when the
|
||||
# user responds, so slice length only controls heartbeat
|
||||
# cadence, not user-visible responsiveness.
|
||||
if entry.event.wait(timeout=min(1.0, _remaining)):
|
||||
resolved = True
|
||||
break
|
||||
if touch_activity_if_due is not None:
|
||||
touch_activity_if_due(
|
||||
_activity_state, "waiting for user approval"
|
||||
)
|
||||
|
||||
# Clean up this entry from the queue
|
||||
with _lock:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue