mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(interrupt): propagate to concurrent-tool workers + opt-in debug trace (#11907)
* fix(interrupt): propagate to concurrent-tool workers + opt-in debug trace
interrupt() previously only flagged the agent's _execution_thread_id.
Tools running inside _execute_tool_calls_concurrent execute on
ThreadPoolExecutor worker threads whose tids are distinct from the
agent's, so is_interrupted() inside those tools returned False no matter
how many times the gateway called .interrupt() — hung ssh / curl / long
make-builds ran to their own timeout.
Changes:
- run_agent.py: track concurrent-tool worker tids in a per-agent set,
fan interrupt()/clear_interrupt() out to them, and handle the
register-after-interrupt race at _run_tool entry. getattr fallback
for the tracker so test stubs built via object.__new__ keep working.
- tools/environments/base.py: opt-in _wait_for_process trace (ENTER,
per-30s HEARTBEAT with interrupt+activity-cb state, INTERRUPT
DETECTED, TIMEOUT, EXIT) behind HERMES_DEBUG_INTERRUPT=1.
- tools/interrupt.py: opt-in set_interrupt() trace (caller tid, target
tid, set snapshot) behind the same env flag.
- tests: new regression test runs a polling tool on a concurrent worker
and asserts is_interrupted() flips to True within ~1s of interrupt().
Second new test guards clear_interrupt() clearing tracked worker bits.
Validation: tests/run_agent/ all 762 pass; tests/tools/ interrupt+env
subset 216 pass.
* fix(interrupt-debug): bypass quiet_mode logger filter so trace reaches agent.log
AIAgent.__init__ sets logging.getLogger('tools').setLevel(ERROR) when
quiet_mode=True (the CLI default). This would silently swallow every
INFO-level trace line from the HERMES_DEBUG_INTERRUPT=1 instrumentation
added in the parent commit — confirmed by running hermes chat -q with
the flag and finding zero trace lines in agent.log even though
_wait_for_process was clearly executing (subprocess pid existed).
Fix: when HERMES_DEBUG_INTERRUPT=1, each traced module explicitly sets
its own logger level to INFO at import time, overriding the 'tools'
parent-level filter. Scoped to the opt-in case only, so production
(quiet_mode default) logs stay quiet as designed.
Validation: hermes chat -q with HERMES_DEBUG_INTERRUPT=1 now writes
'_wait_for_process ENTER/EXIT' lines to agent.log as expected.
* fix(cli): SIGTERM/SIGHUP no longer orphans tool subprocesses
Tool subprocesses spawned by the local environment backend use
os.setsid so they run in their own process group. Before this fix,
SIGTERM/SIGHUP to the hermes CLI killed the main thread via
KeyboardInterrupt but the worker thread running _wait_for_process
never got a chance to call _kill_process — Python exited, the child
was reparented to init (PPID=1), and the subprocess ran to its
natural end (confirmed live: sleep 300 survived 4+ min after SIGTERM
to the agent until manual cleanup).
Changes:
- cli.py _signal_handler (interactive) + _signal_handler_q (-q mode):
route SIGTERM/SIGHUP through agent.interrupt() so the worker's poll
loop sees the per-thread interrupt flag and calls _kill_process
(os.killpg) on the subprocess group. HERMES_SIGTERM_GRACE (default
1.5s) gives the worker time to complete its SIGTERM+SIGKILL
escalation before KeyboardInterrupt unwinds main.
- tools/environments/base.py _wait_for_process: wrap the poll loop in
try/except (KeyboardInterrupt, SystemExit) so the cleanup fires
even on paths the signal handlers don't cover (direct sys.exit,
unhandled KI from nested code, etc.). Emits EXCEPTION_EXIT trace
line when HERMES_DEBUG_INTERRUPT=1.
- New regression test: injects KeyboardInterrupt into a running
_wait_for_process via PyThreadState_SetAsyncExc, verifies the
subprocess process group is dead within 3s of the exception and
that KeyboardInterrupt re-raises cleanly afterward.
Validation:
| Before | After |
|---------------------------------------------------------|--------------------|
| sleep 300 survives 4+ min as PPID=1 orphan after SIGTERM | dies within 2 s |
| No INTERRUPT DETECTED in trace | INTERRUPT DETECTED fires + killing process group |
| tests/tools/test_local_interrupt_cleanup | 1/1 pass |
| tests/run_agent/test_concurrent_interrupt | 4/4 pass |
This commit is contained in:
parent
607be54a24
commit
20f2258f34
6 changed files with 551 additions and 22 deletions
69
cli.py
69
cli.py
|
|
@ -10067,8 +10067,36 @@ class HermesCLI:
|
||||||
|
|
||||||
# Register signal handlers for graceful shutdown on SSH disconnect / SIGTERM
|
# Register signal handlers for graceful shutdown on SSH disconnect / SIGTERM
|
||||||
def _signal_handler(signum, frame):
|
def _signal_handler(signum, frame):
|
||||||
"""Handle SIGHUP/SIGTERM by triggering graceful cleanup."""
|
"""Handle SIGHUP/SIGTERM by triggering graceful cleanup.
|
||||||
|
|
||||||
|
Calls ``self.agent.interrupt()`` first so the agent daemon
|
||||||
|
thread's poll loop sees the per-thread interrupt and kills the
|
||||||
|
tool's subprocess group via ``_kill_process`` (os.killpg).
|
||||||
|
Without this, the main thread dies from KeyboardInterrupt and
|
||||||
|
the daemon thread is killed with it — before it can run one
|
||||||
|
more poll iteration to clean up the subprocess, which was
|
||||||
|
spawned with ``os.setsid`` and therefore survives as an orphan
|
||||||
|
with PPID=1.
|
||||||
|
|
||||||
|
Grace window (``HERMES_SIGTERM_GRACE``, default 1.5 s) gives
|
||||||
|
the daemon time to: detect the interrupt (next 200 ms poll) →
|
||||||
|
call _kill_process (SIGTERM + 1 s wait + SIGKILL if needed) →
|
||||||
|
return from _wait_for_process. ``time.sleep`` releases the
|
||||||
|
GIL so the daemon actually runs during the window.
|
||||||
|
"""
|
||||||
logger.debug("Received signal %s, triggering graceful shutdown", signum)
|
logger.debug("Received signal %s, triggering graceful shutdown", signum)
|
||||||
|
try:
|
||||||
|
if getattr(self, "agent", None) and getattr(self, "_agent_running", False):
|
||||||
|
self.agent.interrupt(f"received signal {signum}")
|
||||||
|
import time as _t
|
||||||
|
try:
|
||||||
|
_grace = float(os.getenv("HERMES_SIGTERM_GRACE", "1.5"))
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
_grace = 1.5
|
||||||
|
if _grace > 0:
|
||||||
|
_t.sleep(_grace)
|
||||||
|
except Exception:
|
||||||
|
pass # never block signal handling
|
||||||
raise KeyboardInterrupt()
|
raise KeyboardInterrupt()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
@ -10372,6 +10400,45 @@ def main(
|
||||||
# Register cleanup for single-query mode (interactive mode registers in run())
|
# Register cleanup for single-query mode (interactive mode registers in run())
|
||||||
atexit.register(_run_cleanup)
|
atexit.register(_run_cleanup)
|
||||||
|
|
||||||
|
# Also install signal handlers in single-query / `-q` mode. Interactive
|
||||||
|
# mode registers its own inside HermesCLI.run(), but `-q` runs
|
||||||
|
# cli.agent.run_conversation() below and AIAgent spawns worker threads
|
||||||
|
# for tools — so when SIGTERM arrives on the main thread, raising
|
||||||
|
# KeyboardInterrupt only unwinds the main thread, not the worker
|
||||||
|
# running _wait_for_process. Python then exits, the child subprocess
|
||||||
|
# (spawned with os.setsid, its own process group) is reparented to
|
||||||
|
# init and keeps running as an orphan.
|
||||||
|
#
|
||||||
|
# Fix: route SIGTERM/SIGHUP through agent.interrupt() which sets the
|
||||||
|
# per-thread interrupt flag the worker's poll loop checks every 200 ms.
|
||||||
|
# Give the worker a grace window to call _kill_process (SIGTERM to the
|
||||||
|
# process group, then SIGKILL after 1 s), then raise KeyboardInterrupt
|
||||||
|
# so main unwinds normally. HERMES_SIGTERM_GRACE overrides the 1.5 s
|
||||||
|
# default for debugging.
|
||||||
|
def _signal_handler_q(signum, frame):
|
||||||
|
logger.debug("Received signal %s in single-query mode", signum)
|
||||||
|
try:
|
||||||
|
_agent = getattr(cli, "agent", None)
|
||||||
|
if _agent is not None:
|
||||||
|
_agent.interrupt(f"received signal {signum}")
|
||||||
|
import time as _t
|
||||||
|
try:
|
||||||
|
_grace = float(os.getenv("HERMES_SIGTERM_GRACE", "1.5"))
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
_grace = 1.5
|
||||||
|
if _grace > 0:
|
||||||
|
_t.sleep(_grace)
|
||||||
|
except Exception:
|
||||||
|
pass # never block signal handling
|
||||||
|
raise KeyboardInterrupt()
|
||||||
|
try:
|
||||||
|
import signal as _signal
|
||||||
|
_signal.signal(_signal.SIGTERM, _signal_handler_q)
|
||||||
|
if hasattr(_signal, "SIGHUP"):
|
||||||
|
_signal.signal(_signal.SIGHUP, _signal_handler_q)
|
||||||
|
except Exception:
|
||||||
|
pass # signal handler may fail in restricted environments
|
||||||
|
|
||||||
# Handle single query mode
|
# Handle single query mode
|
||||||
if query or image:
|
if query or image:
|
||||||
query, single_query_images = _collect_query_images(query, image)
|
query, single_query_images = _collect_query_images(query, image)
|
||||||
|
|
|
||||||
72
run_agent.py
72
run_agent.py
|
|
@ -832,6 +832,16 @@ class AIAgent:
|
||||||
self._interrupt_thread_signal_pending = False
|
self._interrupt_thread_signal_pending = False
|
||||||
self._client_lock = threading.RLock()
|
self._client_lock = threading.RLock()
|
||||||
|
|
||||||
|
# Concurrent-tool worker thread tracking. `_execute_tool_calls_concurrent`
|
||||||
|
# runs each tool on its own ThreadPoolExecutor worker — those worker
|
||||||
|
# threads have tids distinct from `_execution_thread_id`, so
|
||||||
|
# `_set_interrupt(True, _execution_thread_id)` alone does NOT cause
|
||||||
|
# `is_interrupted()` inside the worker to return True. Track the
|
||||||
|
# workers here so `interrupt()` / `clear_interrupt()` can fan out to
|
||||||
|
# their tids explicitly.
|
||||||
|
self._tool_worker_threads: set[int] = set()
|
||||||
|
self._tool_worker_threads_lock = threading.Lock()
|
||||||
|
|
||||||
# Subagent delegation state
|
# Subagent delegation state
|
||||||
self._delegate_depth = 0 # 0 = top-level agent, incremented for children
|
self._delegate_depth = 0 # 0 = top-level agent, incremented for children
|
||||||
self._active_children = [] # Running child AIAgents (for interrupt propagation)
|
self._active_children = [] # Running child AIAgents (for interrupt propagation)
|
||||||
|
|
@ -3191,6 +3201,25 @@ class AIAgent:
|
||||||
# interrupt signal until startup completes instead of targeting
|
# interrupt signal until startup completes instead of targeting
|
||||||
# the caller thread by mistake.
|
# the caller thread by mistake.
|
||||||
self._interrupt_thread_signal_pending = True
|
self._interrupt_thread_signal_pending = True
|
||||||
|
# Fan out to concurrent-tool worker threads. Those workers run tools
|
||||||
|
# on their own tids (ThreadPoolExecutor workers), so `is_interrupted()`
|
||||||
|
# inside a tool only sees an interrupt when their specific tid is in
|
||||||
|
# the `_interrupted_threads` set. Without this propagation, an
|
||||||
|
# already-running concurrent tool (e.g. a terminal command hung on
|
||||||
|
# network I/O) never notices the interrupt and has to run to its own
|
||||||
|
# timeout. See `_run_tool` for the matching entry/exit bookkeeping.
|
||||||
|
# `getattr` fallback covers test stubs that build AIAgent via
|
||||||
|
# object.__new__ and skip __init__.
|
||||||
|
_tracker = getattr(self, "_tool_worker_threads", None)
|
||||||
|
_tracker_lock = getattr(self, "_tool_worker_threads_lock", None)
|
||||||
|
if _tracker is not None and _tracker_lock is not None:
|
||||||
|
with _tracker_lock:
|
||||||
|
_worker_tids = list(_tracker)
|
||||||
|
for _wtid in _worker_tids:
|
||||||
|
try:
|
||||||
|
_set_interrupt(True, _wtid)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
# Propagate interrupt to any running child agents (subagent delegation)
|
# Propagate interrupt to any running child agents (subagent delegation)
|
||||||
with self._active_children_lock:
|
with self._active_children_lock:
|
||||||
children_copy = list(self._active_children)
|
children_copy = list(self._active_children)
|
||||||
|
|
@ -3209,6 +3238,23 @@ class AIAgent:
|
||||||
self._interrupt_thread_signal_pending = False
|
self._interrupt_thread_signal_pending = False
|
||||||
if self._execution_thread_id is not None:
|
if self._execution_thread_id is not None:
|
||||||
_set_interrupt(False, self._execution_thread_id)
|
_set_interrupt(False, self._execution_thread_id)
|
||||||
|
# Also clear any concurrent-tool worker thread bits. Tracked
|
||||||
|
# workers normally clear their own bit on exit, but an explicit
|
||||||
|
# clear here guarantees no stale interrupt can survive a turn
|
||||||
|
# boundary and fire on a subsequent, unrelated tool call that
|
||||||
|
# happens to get scheduled onto the same recycled worker tid.
|
||||||
|
# `getattr` fallback covers test stubs that build AIAgent via
|
||||||
|
# object.__new__ and skip __init__.
|
||||||
|
_tracker = getattr(self, "_tool_worker_threads", None)
|
||||||
|
_tracker_lock = getattr(self, "_tool_worker_threads_lock", None)
|
||||||
|
if _tracker is not None and _tracker_lock is not None:
|
||||||
|
with _tracker_lock:
|
||||||
|
_worker_tids = list(_tracker)
|
||||||
|
for _wtid in _worker_tids:
|
||||||
|
try:
|
||||||
|
_set_interrupt(False, _wtid)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
def _touch_activity(self, desc: str) -> None:
|
def _touch_activity(self, desc: str) -> None:
|
||||||
"""Update the last-activity timestamp and description (thread-safe)."""
|
"""Update the last-activity timestamp and description (thread-safe)."""
|
||||||
|
|
@ -7653,6 +7699,22 @@ class AIAgent:
|
||||||
|
|
||||||
def _run_tool(index, tool_call, function_name, function_args):
|
def _run_tool(index, tool_call, function_name, function_args):
|
||||||
"""Worker function executed in a thread."""
|
"""Worker function executed in a thread."""
|
||||||
|
# Register this worker tid so the agent can fan out an interrupt
|
||||||
|
# to it — see AIAgent.interrupt(). Must happen first thing, and
|
||||||
|
# must be paired with discard + clear in the finally block.
|
||||||
|
_worker_tid = threading.current_thread().ident
|
||||||
|
with self._tool_worker_threads_lock:
|
||||||
|
self._tool_worker_threads.add(_worker_tid)
|
||||||
|
# Race: if the agent was interrupted between fan-out (which
|
||||||
|
# snapshotted an empty/earlier set) and our registration, apply
|
||||||
|
# the interrupt to our own tid now so is_interrupted() inside
|
||||||
|
# the tool returns True on the next poll.
|
||||||
|
if self._interrupt_requested:
|
||||||
|
try:
|
||||||
|
from tools.interrupt import set_interrupt as _sif
|
||||||
|
_sif(True, _worker_tid)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
# Set the activity callback on THIS worker thread so
|
# Set the activity callback on THIS worker thread so
|
||||||
# _wait_for_process (terminal commands) can fire heartbeats.
|
# _wait_for_process (terminal commands) can fire heartbeats.
|
||||||
# The callback is thread-local; the main thread's callback
|
# The callback is thread-local; the main thread's callback
|
||||||
|
|
@ -7675,6 +7737,16 @@ class AIAgent:
|
||||||
else:
|
else:
|
||||||
logger.info("tool %s completed (%.2fs, %d chars)", function_name, duration, len(result))
|
logger.info("tool %s completed (%.2fs, %d chars)", function_name, duration, len(result))
|
||||||
results[index] = (function_name, function_args, result, duration, is_error)
|
results[index] = (function_name, function_args, result, duration, is_error)
|
||||||
|
# Tear down worker-tid tracking. Clear any interrupt bit we may
|
||||||
|
# have set so the next task scheduled onto this recycled tid
|
||||||
|
# starts with a clean slate.
|
||||||
|
with self._tool_worker_threads_lock:
|
||||||
|
self._tool_worker_threads.discard(_worker_tid)
|
||||||
|
try:
|
||||||
|
from tools.interrupt import set_interrupt as _sif
|
||||||
|
_sif(False, _worker_tid)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
# Start spinner for CLI mode (skip when TUI handles tool progress)
|
# Start spinner for CLI mode (skip when TUI handles tool progress)
|
||||||
spinner = None
|
spinner = None
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,10 @@ def _make_agent(monkeypatch):
|
||||||
|
|
||||||
class _Stub:
|
class _Stub:
|
||||||
_interrupt_requested = False
|
_interrupt_requested = False
|
||||||
|
_interrupt_message = None
|
||||||
|
# Bind to this thread's ident so interrupt() targets a real tid.
|
||||||
|
_execution_thread_id = threading.current_thread().ident
|
||||||
|
_interrupt_thread_signal_pending = False
|
||||||
log_prefix = ""
|
log_prefix = ""
|
||||||
quiet_mode = True
|
quiet_mode = True
|
||||||
verbose_logging = False
|
verbose_logging = False
|
||||||
|
|
@ -40,6 +44,15 @@ def _make_agent(monkeypatch):
|
||||||
_current_tool = None
|
_current_tool = None
|
||||||
_last_activity = 0
|
_last_activity = 0
|
||||||
_print_fn = print
|
_print_fn = print
|
||||||
|
# Worker-thread tracking state mirrored from AIAgent.__init__ so the
|
||||||
|
# real interrupt() method can fan out to concurrent-tool workers.
|
||||||
|
_active_children: list = []
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
# Instance-level (not class-level) so each test gets a fresh set.
|
||||||
|
self._tool_worker_threads: set = set()
|
||||||
|
self._tool_worker_threads_lock = threading.Lock()
|
||||||
|
self._active_children_lock = threading.Lock()
|
||||||
|
|
||||||
def _touch_activity(self, desc):
|
def _touch_activity(self, desc):
|
||||||
self._last_activity = time.time()
|
self._last_activity = time.time()
|
||||||
|
|
@ -60,8 +73,10 @@ def _make_agent(monkeypatch):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
stub = _Stub()
|
stub = _Stub()
|
||||||
# Bind the real methods
|
# Bind the real methods under test
|
||||||
stub._execute_tool_calls_concurrent = _ra.AIAgent._execute_tool_calls_concurrent.__get__(stub)
|
stub._execute_tool_calls_concurrent = _ra.AIAgent._execute_tool_calls_concurrent.__get__(stub)
|
||||||
|
stub.interrupt = _ra.AIAgent.interrupt.__get__(stub)
|
||||||
|
stub.clear_interrupt = _ra.AIAgent.clear_interrupt.__get__(stub)
|
||||||
stub._invoke_tool = MagicMock(side_effect=lambda *a, **kw: '{"ok": true}')
|
stub._invoke_tool = MagicMock(side_effect=lambda *a, **kw: '{"ok": true}')
|
||||||
return stub
|
return stub
|
||||||
|
|
||||||
|
|
@ -137,3 +152,109 @@ def test_concurrent_preflight_interrupt_skips_all(monkeypatch):
|
||||||
assert "skipped due to user interrupt" in messages[1]["content"]
|
assert "skipped due to user interrupt" in messages[1]["content"]
|
||||||
# _invoke_tool should never have been called
|
# _invoke_tool should never have been called
|
||||||
agent._invoke_tool.assert_not_called()
|
agent._invoke_tool.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
def test_running_concurrent_worker_sees_is_interrupted(monkeypatch):
|
||||||
|
"""Regression guard for the "interrupt-doesn't-reach-hung-tool" class of
|
||||||
|
bug Physikal reported in April 2026.
|
||||||
|
|
||||||
|
Before this fix, `AIAgent.interrupt()` called `_set_interrupt(True,
|
||||||
|
_execution_thread_id)` — which only flagged the agent's *main* thread.
|
||||||
|
Tools running inside `_execute_tool_calls_concurrent` execute on
|
||||||
|
ThreadPoolExecutor worker threads whose tids are NOT the agent's, so
|
||||||
|
`is_interrupted()` (which checks the *current* thread's tid) returned
|
||||||
|
False inside those tools no matter how many times the gateway called
|
||||||
|
`.interrupt()`. Hung ssh / long curl / big make-build tools would run
|
||||||
|
to their own timeout.
|
||||||
|
|
||||||
|
This test runs a fake tool in the concurrent path that polls
|
||||||
|
`is_interrupted()` like a real terminal command does, then calls
|
||||||
|
`agent.interrupt()` from another thread, and asserts the poll sees True
|
||||||
|
within one second.
|
||||||
|
"""
|
||||||
|
from tools.interrupt import is_interrupted
|
||||||
|
|
||||||
|
agent = _make_agent(monkeypatch)
|
||||||
|
|
||||||
|
# Counter plus observation hooks so we can prove the worker saw the flip.
|
||||||
|
observed = {"saw_true": False, "poll_count": 0, "worker_tid": None}
|
||||||
|
worker_started = threading.Event()
|
||||||
|
|
||||||
|
def polling_tool(name, args, task_id, call_id=None):
|
||||||
|
observed["worker_tid"] = threading.current_thread().ident
|
||||||
|
worker_started.set()
|
||||||
|
deadline = time.monotonic() + 5.0
|
||||||
|
while time.monotonic() < deadline:
|
||||||
|
observed["poll_count"] += 1
|
||||||
|
if is_interrupted():
|
||||||
|
observed["saw_true"] = True
|
||||||
|
return '{"interrupted": true}'
|
||||||
|
time.sleep(0.05)
|
||||||
|
return '{"timed_out": true}'
|
||||||
|
|
||||||
|
agent._invoke_tool = MagicMock(side_effect=polling_tool)
|
||||||
|
|
||||||
|
tc1 = _FakeToolCall("hung_fake_tool_1", call_id="tc1")
|
||||||
|
tc2 = _FakeToolCall("hung_fake_tool_2", call_id="tc2")
|
||||||
|
msg = _FakeAssistantMsg([tc1, tc2])
|
||||||
|
messages = []
|
||||||
|
|
||||||
|
def _interrupt_after_start():
|
||||||
|
# Wait until at least one worker is running so its tid is tracked.
|
||||||
|
worker_started.wait(timeout=2.0)
|
||||||
|
time.sleep(0.2) # let the other worker enter too
|
||||||
|
agent.interrupt("stop requested by test")
|
||||||
|
|
||||||
|
t = threading.Thread(target=_interrupt_after_start)
|
||||||
|
t.start()
|
||||||
|
start = time.monotonic()
|
||||||
|
agent._execute_tool_calls_concurrent(msg, messages, "test_task")
|
||||||
|
elapsed = time.monotonic() - start
|
||||||
|
t.join(timeout=2.0)
|
||||||
|
|
||||||
|
# The worker must have actually polled is_interrupted — otherwise the
|
||||||
|
# test isn't exercising what it claims to.
|
||||||
|
assert observed["poll_count"] > 0, (
|
||||||
|
"polling_tool never ran — test scaffold issue"
|
||||||
|
)
|
||||||
|
# The worker must see the interrupt within ~1 s of agent.interrupt()
|
||||||
|
# being called. Before the fix this loop ran until its 5 s own-timeout.
|
||||||
|
assert observed["saw_true"], (
|
||||||
|
f"is_interrupted() never returned True inside the concurrent worker "
|
||||||
|
f"after agent.interrupt() — interrupt-propagation hole regressed. "
|
||||||
|
f"worker_tid={observed['worker_tid']!r} poll_count={observed['poll_count']}"
|
||||||
|
)
|
||||||
|
assert elapsed < 3.0, (
|
||||||
|
f"concurrent execution took {elapsed:.2f}s after interrupt — the fan-out "
|
||||||
|
f"to worker tids didn't shortcut the tool's poll loop as expected"
|
||||||
|
)
|
||||||
|
# Also verify cleanup: no stale worker tids should remain after all
|
||||||
|
# tools finished.
|
||||||
|
assert agent._tool_worker_threads == set(), (
|
||||||
|
f"worker tids leaked after run: {agent._tool_worker_threads}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_clear_interrupt_clears_worker_tids(monkeypatch):
|
||||||
|
"""After clear_interrupt(), stale worker-tid bits must be cleared so the
|
||||||
|
next turn's tools — which may be scheduled onto recycled tids — don't
|
||||||
|
see a false interrupt."""
|
||||||
|
from tools.interrupt import is_interrupted, set_interrupt
|
||||||
|
|
||||||
|
agent = _make_agent(monkeypatch)
|
||||||
|
# Simulate a worker having registered but not yet exited cleanly (e.g. a
|
||||||
|
# hypothetical bug in the tear-down). Put a fake tid in the set and
|
||||||
|
# flag it interrupted.
|
||||||
|
fake_tid = threading.current_thread().ident # use real tid so is_interrupted can see it
|
||||||
|
with agent._tool_worker_threads_lock:
|
||||||
|
agent._tool_worker_threads.add(fake_tid)
|
||||||
|
set_interrupt(True, fake_tid)
|
||||||
|
assert is_interrupted() is True # sanity
|
||||||
|
|
||||||
|
agent.clear_interrupt()
|
||||||
|
|
||||||
|
assert is_interrupted() is False, (
|
||||||
|
"clear_interrupt() did not clear the interrupt bit for a tracked "
|
||||||
|
"worker tid — stale interrupt can leak into the next turn"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
|
||||||
145
tests/tools/test_local_interrupt_cleanup.py
Normal file
145
tests/tools/test_local_interrupt_cleanup.py
Normal file
|
|
@ -0,0 +1,145 @@
|
||||||
|
"""Regression tests for _wait_for_process subprocess cleanup on exception exit.
|
||||||
|
|
||||||
|
When the poll loop exits via KeyboardInterrupt or SystemExit (SIGTERM via
|
||||||
|
cli.py signal handler, SIGINT on the main thread in non-interactive -q mode,
|
||||||
|
or explicit sys.exit from some caller), the child subprocess must be killed
|
||||||
|
before the exception propagates — otherwise the local backend's use of
|
||||||
|
os.setsid leaves an orphan with PPID=1.
|
||||||
|
|
||||||
|
The live repro that motivated this: hermes chat -q ... 'sleep 300', SIGTERM
|
||||||
|
to the python process, sleep 300 survived with PPID=1 for the full 300 s
|
||||||
|
because _wait_for_process never got to call _kill_process before python
|
||||||
|
died. See commit message for full context.
|
||||||
|
"""
|
||||||
|
import os
|
||||||
|
import signal
|
||||||
|
import subprocess
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from tools.environments.local import LocalEnvironment
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _isolate_hermes_home(tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||||
|
(tmp_path / "logs").mkdir(exist_ok=True)
|
||||||
|
|
||||||
|
|
||||||
|
def _pgid_still_alive(pgid: int) -> bool:
|
||||||
|
"""Return True if any process in the given process group is still alive."""
|
||||||
|
try:
|
||||||
|
os.killpg(pgid, 0) # signal 0 = existence check
|
||||||
|
return True
|
||||||
|
except ProcessLookupError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def test_wait_for_process_kills_subprocess_on_keyboardinterrupt():
|
||||||
|
"""When KeyboardInterrupt arrives mid-poll, the subprocess group must be
|
||||||
|
killed before the exception is re-raised."""
|
||||||
|
env = LocalEnvironment(cwd="/tmp")
|
||||||
|
try:
|
||||||
|
result_holder = {}
|
||||||
|
proc_holder = {}
|
||||||
|
started = threading.Event()
|
||||||
|
raise_at = [None] # set by the main thread to tell worker when
|
||||||
|
|
||||||
|
# Drive execute() on a separate thread so we can SIGNAL-interrupt it
|
||||||
|
# via a thread-targeted exception without killing our test process.
|
||||||
|
def worker():
|
||||||
|
# Spawn a subprocess that will definitely be alive long enough
|
||||||
|
# to observe the cleanup, via env.execute(...) — the normal path
|
||||||
|
# that goes through _wait_for_process.
|
||||||
|
try:
|
||||||
|
result_holder["result"] = env.execute("sleep 30", timeout=60)
|
||||||
|
except BaseException as e: # noqa: BLE001 — we want to observe it
|
||||||
|
result_holder["exception"] = type(e).__name__
|
||||||
|
|
||||||
|
t = threading.Thread(target=worker, daemon=True)
|
||||||
|
t.start()
|
||||||
|
# Wait until the subprocess actually exists. LocalEnvironment.execute
|
||||||
|
# does init_session() (one spawn) before the real command, so we need
|
||||||
|
# to wait until a sleep 30 is visible. Use pgrep-style lookup via
|
||||||
|
# /proc to find the bash process running our sleep.
|
||||||
|
deadline = time.monotonic() + 5.0
|
||||||
|
target_pid = None
|
||||||
|
while time.monotonic() < deadline:
|
||||||
|
# Walk our children and grand-children to find one running 'sleep 30'
|
||||||
|
try:
|
||||||
|
import psutil # optional — fall back if absent
|
||||||
|
for p in psutil.Process(os.getpid()).children(recursive=True):
|
||||||
|
try:
|
||||||
|
if "sleep 30" in " ".join(p.cmdline()):
|
||||||
|
target_pid = p.pid
|
||||||
|
break
|
||||||
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||||
|
continue
|
||||||
|
except ImportError:
|
||||||
|
# Fall back to ps
|
||||||
|
ps = subprocess.run(
|
||||||
|
["ps", "-eo", "pid,ppid,pgid,cmd"], capture_output=True, text=True,
|
||||||
|
)
|
||||||
|
for line in ps.stdout.splitlines():
|
||||||
|
if "sleep 30" in line and "grep" not in line:
|
||||||
|
parts = line.split()
|
||||||
|
if parts and parts[0].isdigit():
|
||||||
|
target_pid = int(parts[0])
|
||||||
|
break
|
||||||
|
if target_pid:
|
||||||
|
break
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
assert target_pid is not None, (
|
||||||
|
"test setup: couldn't find 'sleep 30' subprocess after 5 s"
|
||||||
|
)
|
||||||
|
pgid = os.getpgid(target_pid)
|
||||||
|
assert _pgid_still_alive(pgid), "sanity: subprocess should be alive"
|
||||||
|
|
||||||
|
# Now inject a KeyboardInterrupt into the worker thread the same
|
||||||
|
# way CPython's signal machinery would. We use ctypes.PyThreadState_SetAsyncExc
|
||||||
|
# which is how signal delivery to non-main threads is simulated.
|
||||||
|
import ctypes
|
||||||
|
import sys as _sys
|
||||||
|
# py-thread-state exception targets need the ident, not the Thread
|
||||||
|
tid = t.ident
|
||||||
|
assert tid is not None
|
||||||
|
# Fire KeyboardInterrupt into the worker thread
|
||||||
|
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(
|
||||||
|
ctypes.c_ulong(tid), ctypes.py_object(KeyboardInterrupt),
|
||||||
|
)
|
||||||
|
assert ret == 1, f"SetAsyncExc returned {ret}, expected 1"
|
||||||
|
|
||||||
|
# Give the worker a moment to: hit the exception at the next poll,
|
||||||
|
# run the except-block cleanup (_kill_process), and exit.
|
||||||
|
t.join(timeout=5.0)
|
||||||
|
assert not t.is_alive(), "worker didn't exit within 5 s of the interrupt"
|
||||||
|
|
||||||
|
# The critical assertion: the subprocess GROUP must be dead. Not
|
||||||
|
# just the bash wrapper — the 'sleep 30' child too.
|
||||||
|
# Give the SIGTERM+1s wait+SIGKILL escalation a moment to complete.
|
||||||
|
deadline = time.monotonic() + 3.0
|
||||||
|
while time.monotonic() < deadline:
|
||||||
|
if not _pgid_still_alive(pgid):
|
||||||
|
break
|
||||||
|
time.sleep(0.1)
|
||||||
|
assert not _pgid_still_alive(pgid), (
|
||||||
|
f"subprocess group {pgid} is STILL ALIVE after worker received "
|
||||||
|
f"KeyboardInterrupt — orphan bug regressed. This is the "
|
||||||
|
f"sleep-300-survives-SIGTERM scenario from Physikal's Apr 2026 "
|
||||||
|
f"report. See tools/environments/base.py _wait_for_process "
|
||||||
|
f"except-block."
|
||||||
|
)
|
||||||
|
# And the worker should have observed the KeyboardInterrupt (i.e.
|
||||||
|
# it re-raised cleanly, not silently swallowed).
|
||||||
|
assert result_holder.get("exception") == "KeyboardInterrupt", (
|
||||||
|
f"worker result: {result_holder!r} — expected KeyboardInterrupt "
|
||||||
|
f"propagation after cleanup"
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
env.cleanup()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
@ -23,6 +23,19 @@ from tools.interrupt import is_interrupted
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Opt-in debug tracing for the interrupt/activity/poll machinery. Set
|
||||||
|
# HERMES_DEBUG_INTERRUPT=1 to log loop entry/exit, periodic heartbeats, and
|
||||||
|
# every is_interrupted() state change from _wait_for_process. Off by default
|
||||||
|
# to avoid flooding production gateway logs.
|
||||||
|
_DEBUG_INTERRUPT = bool(os.getenv("HERMES_DEBUG_INTERRUPT"))
|
||||||
|
|
||||||
|
if _DEBUG_INTERRUPT:
|
||||||
|
# AIAgent's quiet_mode path (run_agent.py) forces the `tools` logger to
|
||||||
|
# ERROR on CLI startup, which would silently swallow every trace we emit.
|
||||||
|
# Force this module's own logger back to INFO so the trace is visible in
|
||||||
|
# agent.log regardless of quiet-mode. Scoped to the opt-in case only.
|
||||||
|
logger.setLevel(logging.INFO)
|
||||||
|
|
||||||
# Thread-local activity callback. The agent sets this before a tool call so
|
# Thread-local activity callback. The agent sets this before a tool call so
|
||||||
# long-running _wait_for_process loops can report liveness to the gateway.
|
# long-running _wait_for_process loops can report liveness to the gateway.
|
||||||
_activity_callback_local = threading.local()
|
_activity_callback_local = threading.local()
|
||||||
|
|
@ -413,6 +426,13 @@ class BaseEnvironment(ABC):
|
||||||
Fires the ``activity_callback`` (if set on this instance) every 10s
|
Fires the ``activity_callback`` (if set on this instance) every 10s
|
||||||
while the process is running so the gateway's inactivity timeout
|
while the process is running so the gateway's inactivity timeout
|
||||||
doesn't kill long-running commands.
|
doesn't kill long-running commands.
|
||||||
|
|
||||||
|
Also wraps the poll loop in a ``try/finally`` that guarantees we
|
||||||
|
call ``self._kill_process(proc)`` if we exit via ``KeyboardInterrupt``
|
||||||
|
or ``SystemExit``. Without this, the local backend (which spawns
|
||||||
|
subprocesses with ``os.setsid`` into their own process group) leaves
|
||||||
|
an orphan with ``PPID=1`` when python is shut down mid-tool — the
|
||||||
|
``sleep 300``-survives-30-min bug Physikal and I both hit.
|
||||||
"""
|
"""
|
||||||
output_chunks: list[str] = []
|
output_chunks: list[str] = []
|
||||||
|
|
||||||
|
|
@ -437,8 +457,35 @@ class BaseEnvironment(ABC):
|
||||||
"start": _now,
|
"start": _now,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# --- Debug tracing (opt-in via HERMES_DEBUG_INTERRUPT=1) -------------
|
||||||
|
# Captures loop entry/exit, interrupt state changes, and periodic
|
||||||
|
# heartbeats so we can diagnose "agent never sees the interrupt"
|
||||||
|
# reports without reproducing locally.
|
||||||
|
_tid = threading.current_thread().ident
|
||||||
|
_pid = getattr(proc, "pid", None)
|
||||||
|
_iter_count = 0
|
||||||
|
_last_heartbeat = _now
|
||||||
|
_last_interrupt_state = False
|
||||||
|
_cb_was_none = _get_activity_callback() is None
|
||||||
|
if _DEBUG_INTERRUPT:
|
||||||
|
logger.info(
|
||||||
|
"[interrupt-debug] _wait_for_process ENTER tid=%s pid=%s "
|
||||||
|
"timeout=%ss activity_cb=%s initial_interrupt=%s",
|
||||||
|
_tid, _pid, timeout,
|
||||||
|
"set" if not _cb_was_none else "MISSING",
|
||||||
|
is_interrupted(),
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
while proc.poll() is None:
|
while proc.poll() is None:
|
||||||
|
_iter_count += 1
|
||||||
if is_interrupted():
|
if is_interrupted():
|
||||||
|
if _DEBUG_INTERRUPT:
|
||||||
|
logger.info(
|
||||||
|
"[interrupt-debug] _wait_for_process INTERRUPT DETECTED "
|
||||||
|
"tid=%s pid=%s iter=%d elapsed=%.1fs — killing process group",
|
||||||
|
_tid, _pid, _iter_count, time.monotonic() - _activity_state["start"],
|
||||||
|
)
|
||||||
self._kill_process(proc)
|
self._kill_process(proc)
|
||||||
drain_thread.join(timeout=2)
|
drain_thread.join(timeout=2)
|
||||||
return {
|
return {
|
||||||
|
|
@ -446,6 +493,12 @@ class BaseEnvironment(ABC):
|
||||||
"returncode": 130,
|
"returncode": 130,
|
||||||
}
|
}
|
||||||
if time.monotonic() > deadline:
|
if time.monotonic() > deadline:
|
||||||
|
if _DEBUG_INTERRUPT:
|
||||||
|
logger.info(
|
||||||
|
"[interrupt-debug] _wait_for_process TIMEOUT "
|
||||||
|
"tid=%s pid=%s iter=%d timeout=%ss",
|
||||||
|
_tid, _pid, _iter_count, timeout,
|
||||||
|
)
|
||||||
self._kill_process(proc)
|
self._kill_process(proc)
|
||||||
drain_thread.join(timeout=2)
|
drain_thread.join(timeout=2)
|
||||||
partial = "".join(output_chunks)
|
partial = "".join(output_chunks)
|
||||||
|
|
@ -458,7 +511,47 @@ class BaseEnvironment(ABC):
|
||||||
}
|
}
|
||||||
# Periodic activity touch so the gateway knows we're alive
|
# Periodic activity touch so the gateway knows we're alive
|
||||||
touch_activity_if_due(_activity_state, "terminal command running")
|
touch_activity_if_due(_activity_state, "terminal command running")
|
||||||
|
|
||||||
|
# Heartbeat every ~30s: proves the loop is alive and reports
|
||||||
|
# the activity-callback state (thread-local, can get clobbered
|
||||||
|
# by nested tool calls or executor thread reuse).
|
||||||
|
if _DEBUG_INTERRUPT and time.monotonic() - _last_heartbeat >= 30.0:
|
||||||
|
_cb_now_none = _get_activity_callback() is None
|
||||||
|
logger.info(
|
||||||
|
"[interrupt-debug] _wait_for_process HEARTBEAT "
|
||||||
|
"tid=%s pid=%s iter=%d elapsed=%.0fs "
|
||||||
|
"interrupt=%s activity_cb=%s%s",
|
||||||
|
_tid, _pid, _iter_count,
|
||||||
|
time.monotonic() - _activity_state["start"],
|
||||||
|
is_interrupted(),
|
||||||
|
"set" if not _cb_now_none else "MISSING",
|
||||||
|
" (LOST during run)" if _cb_now_none and not _cb_was_none else "",
|
||||||
|
)
|
||||||
|
_last_heartbeat = time.monotonic()
|
||||||
|
_cb_was_none = _cb_now_none
|
||||||
|
|
||||||
time.sleep(0.2)
|
time.sleep(0.2)
|
||||||
|
except (KeyboardInterrupt, SystemExit):
|
||||||
|
# Signal arrived (SIGTERM/SIGHUP/SIGINT) or sys.exit() was called
|
||||||
|
# while we were polling. The local backend spawns subprocesses
|
||||||
|
# with os.setsid, which puts them in their own process group — so
|
||||||
|
# if we let the interrupt propagate without killing the child,
|
||||||
|
# python exits and the child is reparented to init (PPID=1) and
|
||||||
|
# keeps running as an orphan. Killing the process group here
|
||||||
|
# guarantees the tool's side effects stop when the agent stops.
|
||||||
|
if _DEBUG_INTERRUPT:
|
||||||
|
logger.info(
|
||||||
|
"[interrupt-debug] _wait_for_process EXCEPTION_EXIT "
|
||||||
|
"tid=%s pid=%s iter=%d elapsed=%.1fs — killing subprocess group before re-raise",
|
||||||
|
_tid, _pid, _iter_count,
|
||||||
|
time.monotonic() - _activity_state["start"],
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
self._kill_process(proc)
|
||||||
|
drain_thread.join(timeout=2)
|
||||||
|
except Exception:
|
||||||
|
pass # cleanup is best-effort
|
||||||
|
raise
|
||||||
|
|
||||||
drain_thread.join(timeout=5)
|
drain_thread.join(timeout=5)
|
||||||
|
|
||||||
|
|
@ -467,6 +560,15 @@ class BaseEnvironment(ABC):
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
if _DEBUG_INTERRUPT:
|
||||||
|
logger.info(
|
||||||
|
"[interrupt-debug] _wait_for_process EXIT (natural) "
|
||||||
|
"tid=%s pid=%s iter=%d elapsed=%.1fs returncode=%s",
|
||||||
|
_tid, _pid, _iter_count,
|
||||||
|
time.monotonic() - _activity_state["start"],
|
||||||
|
proc.returncode,
|
||||||
|
)
|
||||||
|
|
||||||
return {"output": "".join(output_chunks), "returncode": proc.returncode}
|
return {"output": "".join(output_chunks), "returncode": proc.returncode}
|
||||||
|
|
||||||
def _kill_process(self, proc: ProcessHandle):
|
def _kill_process(self, proc: ProcessHandle):
|
||||||
|
|
|
||||||
|
|
@ -14,8 +14,23 @@ Usage in tools:
|
||||||
return {"output": "[interrupted]", "returncode": 130}
|
return {"output": "[interrupted]", "returncode": 130}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
import threading
|
import threading
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Opt-in debug tracing — pairs with HERMES_DEBUG_INTERRUPT in
|
||||||
|
# tools/environments/base.py. Enables per-call logging of set/check so the
|
||||||
|
# caller thread, target thread, and current state are visible when
|
||||||
|
# diagnosing "interrupt signaled but tool never saw it" reports.
|
||||||
|
_DEBUG_INTERRUPT = bool(os.getenv("HERMES_DEBUG_INTERRUPT"))
|
||||||
|
|
||||||
|
if _DEBUG_INTERRUPT:
|
||||||
|
# AIAgent's quiet_mode path forces `tools` logger to ERROR on CLI startup.
|
||||||
|
# Force our own logger back to INFO so the trace is visible in agent.log.
|
||||||
|
logger.setLevel(logging.INFO)
|
||||||
|
|
||||||
# Set of thread idents that have been interrupted.
|
# Set of thread idents that have been interrupted.
|
||||||
_interrupted_threads: set[int] = set()
|
_interrupted_threads: set[int] = set()
|
||||||
_lock = threading.Lock()
|
_lock = threading.Lock()
|
||||||
|
|
@ -35,6 +50,13 @@ def set_interrupt(active: bool, thread_id: int | None = None) -> None:
|
||||||
_interrupted_threads.add(tid)
|
_interrupted_threads.add(tid)
|
||||||
else:
|
else:
|
||||||
_interrupted_threads.discard(tid)
|
_interrupted_threads.discard(tid)
|
||||||
|
_snapshot = set(_interrupted_threads) if _DEBUG_INTERRUPT else None
|
||||||
|
if _DEBUG_INTERRUPT:
|
||||||
|
logger.info(
|
||||||
|
"[interrupt-debug] set_interrupt(active=%s, target_tid=%s) "
|
||||||
|
"called_from_tid=%s current_set=%s",
|
||||||
|
active, tid, threading.current_thread().ident, _snapshot,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def is_interrupted() -> bool:
|
def is_interrupted() -> bool:
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue