hermes-agent/tools/interrupt.py
Teknium 20f2258f34
fix(interrupt): propagate to concurrent-tool workers + opt-in debug trace (#11907)
* fix(interrupt): propagate to concurrent-tool workers + opt-in debug trace

interrupt() previously only flagged the agent's _execution_thread_id.
Tools running inside _execute_tool_calls_concurrent execute on
ThreadPoolExecutor worker threads whose tids are distinct from the
agent's, so is_interrupted() inside those tools returned False no matter
how many times the gateway called .interrupt() — hung ssh / curl / long
make-builds ran to their own timeout.

Changes:
- run_agent.py: track concurrent-tool worker tids in a per-agent set,
  fan interrupt()/clear_interrupt() out to them, and handle the
  register-after-interrupt race at _run_tool entry.  getattr fallback
  for the tracker so test stubs built via object.__new__ keep working.
- tools/environments/base.py: opt-in _wait_for_process trace (ENTER,
  per-30s HEARTBEAT with interrupt+activity-cb state, INTERRUPT
  DETECTED, TIMEOUT, EXIT) behind HERMES_DEBUG_INTERRUPT=1.
- tools/interrupt.py: opt-in set_interrupt() trace (caller tid, target
  tid, set snapshot) behind the same env flag.
- tests: new regression test runs a polling tool on a concurrent worker
  and asserts is_interrupted() flips to True within ~1s of interrupt().
  Second new test guards clear_interrupt() clearing tracked worker bits.

Validation: tests/run_agent/ all 762 pass; tests/tools/ interrupt+env
subset 216 pass.

* fix(interrupt-debug): bypass quiet_mode logger filter so trace reaches agent.log

AIAgent.__init__ sets logging.getLogger('tools').setLevel(ERROR) when
quiet_mode=True (the CLI default). This would silently swallow every
INFO-level trace line from the HERMES_DEBUG_INTERRUPT=1 instrumentation
added in the parent commit — confirmed by running hermes chat -q with
the flag and finding zero trace lines in agent.log even though
_wait_for_process was clearly executing (subprocess pid existed).

Fix: when HERMES_DEBUG_INTERRUPT=1, each traced module explicitly sets
its own logger level to INFO at import time, overriding the 'tools'
parent-level filter. Scoped to the opt-in case only, so production
(quiet_mode default) logs stay quiet as designed.

Validation: hermes chat -q with HERMES_DEBUG_INTERRUPT=1 now writes
'_wait_for_process ENTER/EXIT' lines to agent.log as expected.

* fix(cli): SIGTERM/SIGHUP no longer orphans tool subprocesses

Tool subprocesses spawned by the local environment backend use
os.setsid so they run in their own process group. Before this fix,
SIGTERM/SIGHUP to the hermes CLI killed the main thread via
KeyboardInterrupt but the worker thread running _wait_for_process
never got a chance to call _kill_process — Python exited, the child
was reparented to init (PPID=1), and the subprocess ran to its
natural end (confirmed live: sleep 300 survived 4+ min after SIGTERM
to the agent until manual cleanup).

Changes:
- cli.py _signal_handler (interactive) + _signal_handler_q (-q mode):
  route SIGTERM/SIGHUP through agent.interrupt() so the worker's poll
  loop sees the per-thread interrupt flag and calls _kill_process
  (os.killpg) on the subprocess group. HERMES_SIGTERM_GRACE (default
  1.5s) gives the worker time to complete its SIGTERM+SIGKILL
  escalation before KeyboardInterrupt unwinds main.
- tools/environments/base.py _wait_for_process: wrap the poll loop in
  try/except (KeyboardInterrupt, SystemExit) so the cleanup fires
  even on paths the signal handlers don't cover (direct sys.exit,
  unhandled KI from nested code, etc.). Emits EXCEPTION_EXIT trace
  line when HERMES_DEBUG_INTERRUPT=1.
- New regression test: injects KeyboardInterrupt into a running
  _wait_for_process via PyThreadState_SetAsyncExc, verifies the
  subprocess process group is dead within 3s of the exception and
  that KeyboardInterrupt re-raises cleanly afterward.

Validation:
| Before                                                  | After              |
|---------------------------------------------------------|--------------------|
| sleep 300 survives 4+ min as PPID=1 orphan after SIGTERM | dies within 2 s   |
| No INTERRUPT DETECTED in trace                          | INTERRUPT DETECTED fires + killing process group |
| tests/tools/test_local_interrupt_cleanup                | 1/1 pass          |
| tests/run_agent/test_concurrent_interrupt               | 4/4 pass          |
2026-04-17 20:39:25 -07:00

98 lines
3.5 KiB
Python

"""Per-thread interrupt signaling for all tools.
Provides thread-scoped interrupt tracking so that interrupting one agent
session does not kill tools running in other sessions. This is critical
in the gateway where multiple agents run concurrently in the same process.
The agent stores its execution thread ID at the start of run_conversation()
and passes it to set_interrupt()/clear_interrupt(). Tools call
is_interrupted() which checks the CURRENT thread — no argument needed.
Usage in tools:
from tools.interrupt import is_interrupted
if is_interrupted():
return {"output": "[interrupted]", "returncode": 130}
"""
import logging
import os
import threading
logger = logging.getLogger(__name__)
# Opt-in debug tracing — pairs with HERMES_DEBUG_INTERRUPT in
# tools/environments/base.py. Enables per-call logging of set/check so the
# caller thread, target thread, and current state are visible when
# diagnosing "interrupt signaled but tool never saw it" reports.
_DEBUG_INTERRUPT = bool(os.getenv("HERMES_DEBUG_INTERRUPT"))
if _DEBUG_INTERRUPT:
# AIAgent's quiet_mode path forces `tools` logger to ERROR on CLI startup.
# Force our own logger back to INFO so the trace is visible in agent.log.
logger.setLevel(logging.INFO)
# Set of thread idents that have been interrupted.
_interrupted_threads: set[int] = set()
_lock = threading.Lock()
def set_interrupt(active: bool, thread_id: int | None = None) -> None:
"""Set or clear interrupt for a specific thread.
Args:
active: True to signal interrupt, False to clear it.
thread_id: Target thread ident. When None, targets the
current thread (backward compat for CLI/tests).
"""
tid = thread_id if thread_id is not None else threading.current_thread().ident
with _lock:
if active:
_interrupted_threads.add(tid)
else:
_interrupted_threads.discard(tid)
_snapshot = set(_interrupted_threads) if _DEBUG_INTERRUPT else None
if _DEBUG_INTERRUPT:
logger.info(
"[interrupt-debug] set_interrupt(active=%s, target_tid=%s) "
"called_from_tid=%s current_set=%s",
active, tid, threading.current_thread().ident, _snapshot,
)
def is_interrupted() -> bool:
"""Check if an interrupt has been requested for the current thread.
Safe to call from any thread — each thread only sees its own
interrupt state.
"""
tid = threading.current_thread().ident
with _lock:
return tid in _interrupted_threads
# ---------------------------------------------------------------------------
# Backward-compatible _interrupt_event proxy
# ---------------------------------------------------------------------------
# Some legacy call sites (code_execution_tool, process_registry, tests)
# import _interrupt_event directly and call .is_set() / .set() / .clear().
# This shim maps those calls to the per-thread functions above so existing
# code keeps working while the underlying mechanism is thread-scoped.
class _ThreadAwareEventProxy:
"""Drop-in proxy that maps threading.Event methods to per-thread state."""
def is_set(self) -> bool:
return is_interrupted()
def set(self) -> None: # noqa: A003
set_interrupt(True)
def clear(self) -> None:
set_interrupt(False)
def wait(self, timeout: float | None = None) -> bool:
"""Not truly supported — returns current state immediately."""
return self.is_set()
_interrupt_event = _ThreadAwareEventProxy()