From 20f2258f3481e708fc954034ee36e2c72bce1782 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Fri, 17 Apr 2026 20:39:25 -0700 Subject: [PATCH] fix(interrupt): propagate to concurrent-tool workers + opt-in debug trace (#11907) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(interrupt): propagate to concurrent-tool workers + opt-in debug trace interrupt() previously only flagged the agent's _execution_thread_id. Tools running inside _execute_tool_calls_concurrent execute on ThreadPoolExecutor worker threads whose tids are distinct from the agent's, so is_interrupted() inside those tools returned False no matter how many times the gateway called .interrupt() — hung ssh / curl / long make-builds ran to their own timeout. Changes: - run_agent.py: track concurrent-tool worker tids in a per-agent set, fan interrupt()/clear_interrupt() out to them, and handle the register-after-interrupt race at _run_tool entry. getattr fallback for the tracker so test stubs built via object.__new__ keep working. - tools/environments/base.py: opt-in _wait_for_process trace (ENTER, per-30s HEARTBEAT with interrupt+activity-cb state, INTERRUPT DETECTED, TIMEOUT, EXIT) behind HERMES_DEBUG_INTERRUPT=1. - tools/interrupt.py: opt-in set_interrupt() trace (caller tid, target tid, set snapshot) behind the same env flag. - tests: new regression test runs a polling tool on a concurrent worker and asserts is_interrupted() flips to True within ~1s of interrupt(). Second new test guards clear_interrupt() clearing tracked worker bits. Validation: tests/run_agent/ all 762 pass; tests/tools/ interrupt+env subset 216 pass. * fix(interrupt-debug): bypass quiet_mode logger filter so trace reaches agent.log AIAgent.__init__ sets logging.getLogger('tools').setLevel(ERROR) when quiet_mode=True (the CLI default). This would silently swallow every INFO-level trace line from the HERMES_DEBUG_INTERRUPT=1 instrumentation added in the parent commit — confirmed by running hermes chat -q with the flag and finding zero trace lines in agent.log even though _wait_for_process was clearly executing (subprocess pid existed). Fix: when HERMES_DEBUG_INTERRUPT=1, each traced module explicitly sets its own logger level to INFO at import time, overriding the 'tools' parent-level filter. Scoped to the opt-in case only, so production (quiet_mode default) logs stay quiet as designed. Validation: hermes chat -q with HERMES_DEBUG_INTERRUPT=1 now writes '_wait_for_process ENTER/EXIT' lines to agent.log as expected. * fix(cli): SIGTERM/SIGHUP no longer orphans tool subprocesses Tool subprocesses spawned by the local environment backend use os.setsid so they run in their own process group. Before this fix, SIGTERM/SIGHUP to the hermes CLI killed the main thread via KeyboardInterrupt but the worker thread running _wait_for_process never got a chance to call _kill_process — Python exited, the child was reparented to init (PPID=1), and the subprocess ran to its natural end (confirmed live: sleep 300 survived 4+ min after SIGTERM to the agent until manual cleanup). Changes: - cli.py _signal_handler (interactive) + _signal_handler_q (-q mode): route SIGTERM/SIGHUP through agent.interrupt() so the worker's poll loop sees the per-thread interrupt flag and calls _kill_process (os.killpg) on the subprocess group. HERMES_SIGTERM_GRACE (default 1.5s) gives the worker time to complete its SIGTERM+SIGKILL escalation before KeyboardInterrupt unwinds main. - tools/environments/base.py _wait_for_process: wrap the poll loop in try/except (KeyboardInterrupt, SystemExit) so the cleanup fires even on paths the signal handlers don't cover (direct sys.exit, unhandled KI from nested code, etc.). Emits EXCEPTION_EXIT trace line when HERMES_DEBUG_INTERRUPT=1. - New regression test: injects KeyboardInterrupt into a running _wait_for_process via PyThreadState_SetAsyncExc, verifies the subprocess process group is dead within 3s of the exception and that KeyboardInterrupt re-raises cleanly afterward. Validation: | Before | After | |---------------------------------------------------------|--------------------| | sleep 300 survives 4+ min as PPID=1 orphan after SIGTERM | dies within 2 s | | No INTERRUPT DETECTED in trace | INTERRUPT DETECTED fires + killing process group | | tests/tools/test_local_interrupt_cleanup | 1/1 pass | | tests/run_agent/test_concurrent_interrupt | 4/4 pass | --- cli.py | 69 ++++++++- run_agent.py | 72 +++++++++ tests/run_agent/test_concurrent_interrupt.py | 123 +++++++++++++++- tests/tools/test_local_interrupt_cleanup.py | 145 +++++++++++++++++++ tools/environments/base.py | 142 +++++++++++++++--- tools/interrupt.py | 22 +++ 6 files changed, 551 insertions(+), 22 deletions(-) create mode 100644 tests/tools/test_local_interrupt_cleanup.py diff --git a/cli.py b/cli.py index c0c17babc..2456c7754 100644 --- a/cli.py +++ b/cli.py @@ -10067,8 +10067,36 @@ class HermesCLI: # Register signal handlers for graceful shutdown on SSH disconnect / SIGTERM def _signal_handler(signum, frame): - """Handle SIGHUP/SIGTERM by triggering graceful cleanup.""" + """Handle SIGHUP/SIGTERM by triggering graceful cleanup. + + Calls ``self.agent.interrupt()`` first so the agent daemon + thread's poll loop sees the per-thread interrupt and kills the + tool's subprocess group via ``_kill_process`` (os.killpg). + Without this, the main thread dies from KeyboardInterrupt and + the daemon thread is killed with it — before it can run one + more poll iteration to clean up the subprocess, which was + spawned with ``os.setsid`` and therefore survives as an orphan + with PPID=1. + + Grace window (``HERMES_SIGTERM_GRACE``, default 1.5 s) gives + the daemon time to: detect the interrupt (next 200 ms poll) → + call _kill_process (SIGTERM + 1 s wait + SIGKILL if needed) → + return from _wait_for_process. ``time.sleep`` releases the + GIL so the daemon actually runs during the window. + """ logger.debug("Received signal %s, triggering graceful shutdown", signum) + try: + if getattr(self, "agent", None) and getattr(self, "_agent_running", False): + self.agent.interrupt(f"received signal {signum}") + import time as _t + try: + _grace = float(os.getenv("HERMES_SIGTERM_GRACE", "1.5")) + except (TypeError, ValueError): + _grace = 1.5 + if _grace > 0: + _t.sleep(_grace) + except Exception: + pass # never block signal handling raise KeyboardInterrupt() try: @@ -10371,6 +10399,45 @@ def main( # Register cleanup for single-query mode (interactive mode registers in run()) atexit.register(_run_cleanup) + + # Also install signal handlers in single-query / `-q` mode. Interactive + # mode registers its own inside HermesCLI.run(), but `-q` runs + # cli.agent.run_conversation() below and AIAgent spawns worker threads + # for tools — so when SIGTERM arrives on the main thread, raising + # KeyboardInterrupt only unwinds the main thread, not the worker + # running _wait_for_process. Python then exits, the child subprocess + # (spawned with os.setsid, its own process group) is reparented to + # init and keeps running as an orphan. + # + # Fix: route SIGTERM/SIGHUP through agent.interrupt() which sets the + # per-thread interrupt flag the worker's poll loop checks every 200 ms. + # Give the worker a grace window to call _kill_process (SIGTERM to the + # process group, then SIGKILL after 1 s), then raise KeyboardInterrupt + # so main unwinds normally. HERMES_SIGTERM_GRACE overrides the 1.5 s + # default for debugging. + def _signal_handler_q(signum, frame): + logger.debug("Received signal %s in single-query mode", signum) + try: + _agent = getattr(cli, "agent", None) + if _agent is not None: + _agent.interrupt(f"received signal {signum}") + import time as _t + try: + _grace = float(os.getenv("HERMES_SIGTERM_GRACE", "1.5")) + except (TypeError, ValueError): + _grace = 1.5 + if _grace > 0: + _t.sleep(_grace) + except Exception: + pass # never block signal handling + raise KeyboardInterrupt() + try: + import signal as _signal + _signal.signal(_signal.SIGTERM, _signal_handler_q) + if hasattr(_signal, "SIGHUP"): + _signal.signal(_signal.SIGHUP, _signal_handler_q) + except Exception: + pass # signal handler may fail in restricted environments # Handle single query mode if query or image: diff --git a/run_agent.py b/run_agent.py index ef90ae39e..010715280 100644 --- a/run_agent.py +++ b/run_agent.py @@ -831,6 +831,16 @@ class AIAgent: self._execution_thread_id: int | None = None # Set at run_conversation() start self._interrupt_thread_signal_pending = False self._client_lock = threading.RLock() + + # Concurrent-tool worker thread tracking. `_execute_tool_calls_concurrent` + # runs each tool on its own ThreadPoolExecutor worker — those worker + # threads have tids distinct from `_execution_thread_id`, so + # `_set_interrupt(True, _execution_thread_id)` alone does NOT cause + # `is_interrupted()` inside the worker to return True. Track the + # workers here so `interrupt()` / `clear_interrupt()` can fan out to + # their tids explicitly. + self._tool_worker_threads: set[int] = set() + self._tool_worker_threads_lock = threading.Lock() # Subagent delegation state self._delegate_depth = 0 # 0 = top-level agent, incremented for children @@ -3191,6 +3201,25 @@ class AIAgent: # interrupt signal until startup completes instead of targeting # the caller thread by mistake. self._interrupt_thread_signal_pending = True + # Fan out to concurrent-tool worker threads. Those workers run tools + # on their own tids (ThreadPoolExecutor workers), so `is_interrupted()` + # inside a tool only sees an interrupt when their specific tid is in + # the `_interrupted_threads` set. Without this propagation, an + # already-running concurrent tool (e.g. a terminal command hung on + # network I/O) never notices the interrupt and has to run to its own + # timeout. See `_run_tool` for the matching entry/exit bookkeeping. + # `getattr` fallback covers test stubs that build AIAgent via + # object.__new__ and skip __init__. + _tracker = getattr(self, "_tool_worker_threads", None) + _tracker_lock = getattr(self, "_tool_worker_threads_lock", None) + if _tracker is not None and _tracker_lock is not None: + with _tracker_lock: + _worker_tids = list(_tracker) + for _wtid in _worker_tids: + try: + _set_interrupt(True, _wtid) + except Exception: + pass # Propagate interrupt to any running child agents (subagent delegation) with self._active_children_lock: children_copy = list(self._active_children) @@ -3209,6 +3238,23 @@ class AIAgent: self._interrupt_thread_signal_pending = False if self._execution_thread_id is not None: _set_interrupt(False, self._execution_thread_id) + # Also clear any concurrent-tool worker thread bits. Tracked + # workers normally clear their own bit on exit, but an explicit + # clear here guarantees no stale interrupt can survive a turn + # boundary and fire on a subsequent, unrelated tool call that + # happens to get scheduled onto the same recycled worker tid. + # `getattr` fallback covers test stubs that build AIAgent via + # object.__new__ and skip __init__. + _tracker = getattr(self, "_tool_worker_threads", None) + _tracker_lock = getattr(self, "_tool_worker_threads_lock", None) + if _tracker is not None and _tracker_lock is not None: + with _tracker_lock: + _worker_tids = list(_tracker) + for _wtid in _worker_tids: + try: + _set_interrupt(False, _wtid) + except Exception: + pass def _touch_activity(self, desc: str) -> None: """Update the last-activity timestamp and description (thread-safe).""" @@ -7653,6 +7699,22 @@ class AIAgent: def _run_tool(index, tool_call, function_name, function_args): """Worker function executed in a thread.""" + # Register this worker tid so the agent can fan out an interrupt + # to it — see AIAgent.interrupt(). Must happen first thing, and + # must be paired with discard + clear in the finally block. + _worker_tid = threading.current_thread().ident + with self._tool_worker_threads_lock: + self._tool_worker_threads.add(_worker_tid) + # Race: if the agent was interrupted between fan-out (which + # snapshotted an empty/earlier set) and our registration, apply + # the interrupt to our own tid now so is_interrupted() inside + # the tool returns True on the next poll. + if self._interrupt_requested: + try: + from tools.interrupt import set_interrupt as _sif + _sif(True, _worker_tid) + except Exception: + pass # Set the activity callback on THIS worker thread so # _wait_for_process (terminal commands) can fire heartbeats. # The callback is thread-local; the main thread's callback @@ -7675,6 +7737,16 @@ class AIAgent: else: logger.info("tool %s completed (%.2fs, %d chars)", function_name, duration, len(result)) results[index] = (function_name, function_args, result, duration, is_error) + # Tear down worker-tid tracking. Clear any interrupt bit we may + # have set so the next task scheduled onto this recycled tid + # starts with a clean slate. + with self._tool_worker_threads_lock: + self._tool_worker_threads.discard(_worker_tid) + try: + from tools.interrupt import set_interrupt as _sif + _sif(False, _worker_tid) + except Exception: + pass # Start spinner for CLI mode (skip when TUI handles tool progress) spinner = None diff --git a/tests/run_agent/test_concurrent_interrupt.py b/tests/run_agent/test_concurrent_interrupt.py index fdeb8dd69..e5d8b88e7 100644 --- a/tests/run_agent/test_concurrent_interrupt.py +++ b/tests/run_agent/test_concurrent_interrupt.py @@ -23,6 +23,10 @@ def _make_agent(monkeypatch): class _Stub: _interrupt_requested = False + _interrupt_message = None + # Bind to this thread's ident so interrupt() targets a real tid. + _execution_thread_id = threading.current_thread().ident + _interrupt_thread_signal_pending = False log_prefix = "" quiet_mode = True verbose_logging = False @@ -40,6 +44,15 @@ def _make_agent(monkeypatch): _current_tool = None _last_activity = 0 _print_fn = print + # Worker-thread tracking state mirrored from AIAgent.__init__ so the + # real interrupt() method can fan out to concurrent-tool workers. + _active_children: list = [] + + def __init__(self): + # Instance-level (not class-level) so each test gets a fresh set. + self._tool_worker_threads: set = set() + self._tool_worker_threads_lock = threading.Lock() + self._active_children_lock = threading.Lock() def _touch_activity(self, desc): self._last_activity = time.time() @@ -60,8 +73,10 @@ def _make_agent(monkeypatch): return False stub = _Stub() - # Bind the real methods + # Bind the real methods under test stub._execute_tool_calls_concurrent = _ra.AIAgent._execute_tool_calls_concurrent.__get__(stub) + stub.interrupt = _ra.AIAgent.interrupt.__get__(stub) + stub.clear_interrupt = _ra.AIAgent.clear_interrupt.__get__(stub) stub._invoke_tool = MagicMock(side_effect=lambda *a, **kw: '{"ok": true}') return stub @@ -137,3 +152,109 @@ def test_concurrent_preflight_interrupt_skips_all(monkeypatch): assert "skipped due to user interrupt" in messages[1]["content"] # _invoke_tool should never have been called agent._invoke_tool.assert_not_called() + + +def test_running_concurrent_worker_sees_is_interrupted(monkeypatch): + """Regression guard for the "interrupt-doesn't-reach-hung-tool" class of + bug Physikal reported in April 2026. + + Before this fix, `AIAgent.interrupt()` called `_set_interrupt(True, + _execution_thread_id)` — which only flagged the agent's *main* thread. + Tools running inside `_execute_tool_calls_concurrent` execute on + ThreadPoolExecutor worker threads whose tids are NOT the agent's, so + `is_interrupted()` (which checks the *current* thread's tid) returned + False inside those tools no matter how many times the gateway called + `.interrupt()`. Hung ssh / long curl / big make-build tools would run + to their own timeout. + + This test runs a fake tool in the concurrent path that polls + `is_interrupted()` like a real terminal command does, then calls + `agent.interrupt()` from another thread, and asserts the poll sees True + within one second. + """ + from tools.interrupt import is_interrupted + + agent = _make_agent(monkeypatch) + + # Counter plus observation hooks so we can prove the worker saw the flip. + observed = {"saw_true": False, "poll_count": 0, "worker_tid": None} + worker_started = threading.Event() + + def polling_tool(name, args, task_id, call_id=None): + observed["worker_tid"] = threading.current_thread().ident + worker_started.set() + deadline = time.monotonic() + 5.0 + while time.monotonic() < deadline: + observed["poll_count"] += 1 + if is_interrupted(): + observed["saw_true"] = True + return '{"interrupted": true}' + time.sleep(0.05) + return '{"timed_out": true}' + + agent._invoke_tool = MagicMock(side_effect=polling_tool) + + tc1 = _FakeToolCall("hung_fake_tool_1", call_id="tc1") + tc2 = _FakeToolCall("hung_fake_tool_2", call_id="tc2") + msg = _FakeAssistantMsg([tc1, tc2]) + messages = [] + + def _interrupt_after_start(): + # Wait until at least one worker is running so its tid is tracked. + worker_started.wait(timeout=2.0) + time.sleep(0.2) # let the other worker enter too + agent.interrupt("stop requested by test") + + t = threading.Thread(target=_interrupt_after_start) + t.start() + start = time.monotonic() + agent._execute_tool_calls_concurrent(msg, messages, "test_task") + elapsed = time.monotonic() - start + t.join(timeout=2.0) + + # The worker must have actually polled is_interrupted — otherwise the + # test isn't exercising what it claims to. + assert observed["poll_count"] > 0, ( + "polling_tool never ran — test scaffold issue" + ) + # The worker must see the interrupt within ~1 s of agent.interrupt() + # being called. Before the fix this loop ran until its 5 s own-timeout. + assert observed["saw_true"], ( + f"is_interrupted() never returned True inside the concurrent worker " + f"after agent.interrupt() — interrupt-propagation hole regressed. " + f"worker_tid={observed['worker_tid']!r} poll_count={observed['poll_count']}" + ) + assert elapsed < 3.0, ( + f"concurrent execution took {elapsed:.2f}s after interrupt — the fan-out " + f"to worker tids didn't shortcut the tool's poll loop as expected" + ) + # Also verify cleanup: no stale worker tids should remain after all + # tools finished. + assert agent._tool_worker_threads == set(), ( + f"worker tids leaked after run: {agent._tool_worker_threads}" + ) + + +def test_clear_interrupt_clears_worker_tids(monkeypatch): + """After clear_interrupt(), stale worker-tid bits must be cleared so the + next turn's tools — which may be scheduled onto recycled tids — don't + see a false interrupt.""" + from tools.interrupt import is_interrupted, set_interrupt + + agent = _make_agent(monkeypatch) + # Simulate a worker having registered but not yet exited cleanly (e.g. a + # hypothetical bug in the tear-down). Put a fake tid in the set and + # flag it interrupted. + fake_tid = threading.current_thread().ident # use real tid so is_interrupted can see it + with agent._tool_worker_threads_lock: + agent._tool_worker_threads.add(fake_tid) + set_interrupt(True, fake_tid) + assert is_interrupted() is True # sanity + + agent.clear_interrupt() + + assert is_interrupted() is False, ( + "clear_interrupt() did not clear the interrupt bit for a tracked " + "worker tid — stale interrupt can leak into the next turn" + ) + diff --git a/tests/tools/test_local_interrupt_cleanup.py b/tests/tools/test_local_interrupt_cleanup.py new file mode 100644 index 000000000..72310009a --- /dev/null +++ b/tests/tools/test_local_interrupt_cleanup.py @@ -0,0 +1,145 @@ +"""Regression tests for _wait_for_process subprocess cleanup on exception exit. + +When the poll loop exits via KeyboardInterrupt or SystemExit (SIGTERM via +cli.py signal handler, SIGINT on the main thread in non-interactive -q mode, +or explicit sys.exit from some caller), the child subprocess must be killed +before the exception propagates — otherwise the local backend's use of +os.setsid leaves an orphan with PPID=1. + +The live repro that motivated this: hermes chat -q ... 'sleep 300', SIGTERM +to the python process, sleep 300 survived with PPID=1 for the full 300 s +because _wait_for_process never got to call _kill_process before python +died. See commit message for full context. +""" +import os +import signal +import subprocess +import threading +import time + +import pytest + +from tools.environments.local import LocalEnvironment + + +@pytest.fixture(autouse=True) +def _isolate_hermes_home(tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + (tmp_path / "logs").mkdir(exist_ok=True) + + +def _pgid_still_alive(pgid: int) -> bool: + """Return True if any process in the given process group is still alive.""" + try: + os.killpg(pgid, 0) # signal 0 = existence check + return True + except ProcessLookupError: + return False + + +def test_wait_for_process_kills_subprocess_on_keyboardinterrupt(): + """When KeyboardInterrupt arrives mid-poll, the subprocess group must be + killed before the exception is re-raised.""" + env = LocalEnvironment(cwd="/tmp") + try: + result_holder = {} + proc_holder = {} + started = threading.Event() + raise_at = [None] # set by the main thread to tell worker when + + # Drive execute() on a separate thread so we can SIGNAL-interrupt it + # via a thread-targeted exception without killing our test process. + def worker(): + # Spawn a subprocess that will definitely be alive long enough + # to observe the cleanup, via env.execute(...) — the normal path + # that goes through _wait_for_process. + try: + result_holder["result"] = env.execute("sleep 30", timeout=60) + except BaseException as e: # noqa: BLE001 — we want to observe it + result_holder["exception"] = type(e).__name__ + + t = threading.Thread(target=worker, daemon=True) + t.start() + # Wait until the subprocess actually exists. LocalEnvironment.execute + # does init_session() (one spawn) before the real command, so we need + # to wait until a sleep 30 is visible. Use pgrep-style lookup via + # /proc to find the bash process running our sleep. + deadline = time.monotonic() + 5.0 + target_pid = None + while time.monotonic() < deadline: + # Walk our children and grand-children to find one running 'sleep 30' + try: + import psutil # optional — fall back if absent + for p in psutil.Process(os.getpid()).children(recursive=True): + try: + if "sleep 30" in " ".join(p.cmdline()): + target_pid = p.pid + break + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + except ImportError: + # Fall back to ps + ps = subprocess.run( + ["ps", "-eo", "pid,ppid,pgid,cmd"], capture_output=True, text=True, + ) + for line in ps.stdout.splitlines(): + if "sleep 30" in line and "grep" not in line: + parts = line.split() + if parts and parts[0].isdigit(): + target_pid = int(parts[0]) + break + if target_pid: + break + time.sleep(0.1) + + assert target_pid is not None, ( + "test setup: couldn't find 'sleep 30' subprocess after 5 s" + ) + pgid = os.getpgid(target_pid) + assert _pgid_still_alive(pgid), "sanity: subprocess should be alive" + + # Now inject a KeyboardInterrupt into the worker thread the same + # way CPython's signal machinery would. We use ctypes.PyThreadState_SetAsyncExc + # which is how signal delivery to non-main threads is simulated. + import ctypes + import sys as _sys + # py-thread-state exception targets need the ident, not the Thread + tid = t.ident + assert tid is not None + # Fire KeyboardInterrupt into the worker thread + ret = ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_ulong(tid), ctypes.py_object(KeyboardInterrupt), + ) + assert ret == 1, f"SetAsyncExc returned {ret}, expected 1" + + # Give the worker a moment to: hit the exception at the next poll, + # run the except-block cleanup (_kill_process), and exit. + t.join(timeout=5.0) + assert not t.is_alive(), "worker didn't exit within 5 s of the interrupt" + + # The critical assertion: the subprocess GROUP must be dead. Not + # just the bash wrapper — the 'sleep 30' child too. + # Give the SIGTERM+1s wait+SIGKILL escalation a moment to complete. + deadline = time.monotonic() + 3.0 + while time.monotonic() < deadline: + if not _pgid_still_alive(pgid): + break + time.sleep(0.1) + assert not _pgid_still_alive(pgid), ( + f"subprocess group {pgid} is STILL ALIVE after worker received " + f"KeyboardInterrupt — orphan bug regressed. This is the " + f"sleep-300-survives-SIGTERM scenario from Physikal's Apr 2026 " + f"report. See tools/environments/base.py _wait_for_process " + f"except-block." + ) + # And the worker should have observed the KeyboardInterrupt (i.e. + # it re-raised cleanly, not silently swallowed). + assert result_holder.get("exception") == "KeyboardInterrupt", ( + f"worker result: {result_holder!r} — expected KeyboardInterrupt " + f"propagation after cleanup" + ) + finally: + try: + env.cleanup() + except Exception: + pass diff --git a/tools/environments/base.py b/tools/environments/base.py index 8e9907923..1bc08449e 100644 --- a/tools/environments/base.py +++ b/tools/environments/base.py @@ -23,6 +23,19 @@ from tools.interrupt import is_interrupted logger = logging.getLogger(__name__) +# Opt-in debug tracing for the interrupt/activity/poll machinery. Set +# HERMES_DEBUG_INTERRUPT=1 to log loop entry/exit, periodic heartbeats, and +# every is_interrupted() state change from _wait_for_process. Off by default +# to avoid flooding production gateway logs. +_DEBUG_INTERRUPT = bool(os.getenv("HERMES_DEBUG_INTERRUPT")) + +if _DEBUG_INTERRUPT: + # AIAgent's quiet_mode path (run_agent.py) forces the `tools` logger to + # ERROR on CLI startup, which would silently swallow every trace we emit. + # Force this module's own logger back to INFO so the trace is visible in + # agent.log regardless of quiet-mode. Scoped to the opt-in case only. + logger.setLevel(logging.INFO) + # Thread-local activity callback. The agent sets this before a tool call so # long-running _wait_for_process loops can report liveness to the gateway. _activity_callback_local = threading.local() @@ -413,6 +426,13 @@ class BaseEnvironment(ABC): Fires the ``activity_callback`` (if set on this instance) every 10s while the process is running so the gateway's inactivity timeout doesn't kill long-running commands. + + Also wraps the poll loop in a ``try/finally`` that guarantees we + call ``self._kill_process(proc)`` if we exit via ``KeyboardInterrupt`` + or ``SystemExit``. Without this, the local backend (which spawns + subprocesses with ``os.setsid`` into their own process group) leaves + an orphan with ``PPID=1`` when python is shut down mid-tool — the + ``sleep 300``-survives-30-min bug Physikal and I both hit. """ output_chunks: list[str] = [] @@ -437,28 +457,101 @@ class BaseEnvironment(ABC): "start": _now, } - while proc.poll() is None: - if is_interrupted(): + # --- Debug tracing (opt-in via HERMES_DEBUG_INTERRUPT=1) ------------- + # Captures loop entry/exit, interrupt state changes, and periodic + # heartbeats so we can diagnose "agent never sees the interrupt" + # reports without reproducing locally. + _tid = threading.current_thread().ident + _pid = getattr(proc, "pid", None) + _iter_count = 0 + _last_heartbeat = _now + _last_interrupt_state = False + _cb_was_none = _get_activity_callback() is None + if _DEBUG_INTERRUPT: + logger.info( + "[interrupt-debug] _wait_for_process ENTER tid=%s pid=%s " + "timeout=%ss activity_cb=%s initial_interrupt=%s", + _tid, _pid, timeout, + "set" if not _cb_was_none else "MISSING", + is_interrupted(), + ) + + try: + while proc.poll() is None: + _iter_count += 1 + if is_interrupted(): + if _DEBUG_INTERRUPT: + logger.info( + "[interrupt-debug] _wait_for_process INTERRUPT DETECTED " + "tid=%s pid=%s iter=%d elapsed=%.1fs — killing process group", + _tid, _pid, _iter_count, time.monotonic() - _activity_state["start"], + ) + self._kill_process(proc) + drain_thread.join(timeout=2) + return { + "output": "".join(output_chunks) + "\n[Command interrupted]", + "returncode": 130, + } + if time.monotonic() > deadline: + if _DEBUG_INTERRUPT: + logger.info( + "[interrupt-debug] _wait_for_process TIMEOUT " + "tid=%s pid=%s iter=%d timeout=%ss", + _tid, _pid, _iter_count, timeout, + ) + self._kill_process(proc) + drain_thread.join(timeout=2) + partial = "".join(output_chunks) + timeout_msg = f"\n[Command timed out after {timeout}s]" + return { + "output": partial + timeout_msg + if partial + else timeout_msg.lstrip(), + "returncode": 124, + } + # Periodic activity touch so the gateway knows we're alive + touch_activity_if_due(_activity_state, "terminal command running") + + # Heartbeat every ~30s: proves the loop is alive and reports + # the activity-callback state (thread-local, can get clobbered + # by nested tool calls or executor thread reuse). + if _DEBUG_INTERRUPT and time.monotonic() - _last_heartbeat >= 30.0: + _cb_now_none = _get_activity_callback() is None + logger.info( + "[interrupt-debug] _wait_for_process HEARTBEAT " + "tid=%s pid=%s iter=%d elapsed=%.0fs " + "interrupt=%s activity_cb=%s%s", + _tid, _pid, _iter_count, + time.monotonic() - _activity_state["start"], + is_interrupted(), + "set" if not _cb_now_none else "MISSING", + " (LOST during run)" if _cb_now_none and not _cb_was_none else "", + ) + _last_heartbeat = time.monotonic() + _cb_was_none = _cb_now_none + + time.sleep(0.2) + except (KeyboardInterrupt, SystemExit): + # Signal arrived (SIGTERM/SIGHUP/SIGINT) or sys.exit() was called + # while we were polling. The local backend spawns subprocesses + # with os.setsid, which puts them in their own process group — so + # if we let the interrupt propagate without killing the child, + # python exits and the child is reparented to init (PPID=1) and + # keeps running as an orphan. Killing the process group here + # guarantees the tool's side effects stop when the agent stops. + if _DEBUG_INTERRUPT: + logger.info( + "[interrupt-debug] _wait_for_process EXCEPTION_EXIT " + "tid=%s pid=%s iter=%d elapsed=%.1fs — killing subprocess group before re-raise", + _tid, _pid, _iter_count, + time.monotonic() - _activity_state["start"], + ) + try: self._kill_process(proc) drain_thread.join(timeout=2) - return { - "output": "".join(output_chunks) + "\n[Command interrupted]", - "returncode": 130, - } - if time.monotonic() > deadline: - self._kill_process(proc) - drain_thread.join(timeout=2) - partial = "".join(output_chunks) - timeout_msg = f"\n[Command timed out after {timeout}s]" - return { - "output": partial + timeout_msg - if partial - else timeout_msg.lstrip(), - "returncode": 124, - } - # Periodic activity touch so the gateway knows we're alive - touch_activity_if_due(_activity_state, "terminal command running") - time.sleep(0.2) + except Exception: + pass # cleanup is best-effort + raise drain_thread.join(timeout=5) @@ -467,6 +560,15 @@ class BaseEnvironment(ABC): except Exception: pass + if _DEBUG_INTERRUPT: + logger.info( + "[interrupt-debug] _wait_for_process EXIT (natural) " + "tid=%s pid=%s iter=%d elapsed=%.1fs returncode=%s", + _tid, _pid, _iter_count, + time.monotonic() - _activity_state["start"], + proc.returncode, + ) + return {"output": "".join(output_chunks), "returncode": proc.returncode} def _kill_process(self, proc: ProcessHandle): diff --git a/tools/interrupt.py b/tools/interrupt.py index 9bc8b83ae..ac784332f 100644 --- a/tools/interrupt.py +++ b/tools/interrupt.py @@ -14,8 +14,23 @@ Usage in tools: return {"output": "[interrupted]", "returncode": 130} """ +import logging +import os import threading +logger = logging.getLogger(__name__) + +# Opt-in debug tracing — pairs with HERMES_DEBUG_INTERRUPT in +# tools/environments/base.py. Enables per-call logging of set/check so the +# caller thread, target thread, and current state are visible when +# diagnosing "interrupt signaled but tool never saw it" reports. +_DEBUG_INTERRUPT = bool(os.getenv("HERMES_DEBUG_INTERRUPT")) + +if _DEBUG_INTERRUPT: + # AIAgent's quiet_mode path forces `tools` logger to ERROR on CLI startup. + # Force our own logger back to INFO so the trace is visible in agent.log. + logger.setLevel(logging.INFO) + # Set of thread idents that have been interrupted. _interrupted_threads: set[int] = set() _lock = threading.Lock() @@ -35,6 +50,13 @@ def set_interrupt(active: bool, thread_id: int | None = None) -> None: _interrupted_threads.add(tid) else: _interrupted_threads.discard(tid) + _snapshot = set(_interrupted_threads) if _DEBUG_INTERRUPT else None + if _DEBUG_INTERRUPT: + logger.info( + "[interrupt-debug] set_interrupt(active=%s, target_tid=%s) " + "called_from_tid=%s current_set=%s", + active, tid, threading.current_thread().ident, _snapshot, + ) def is_interrupted() -> bool: