diff --git a/tests/tools/test_process_registry.py b/tests/tools/test_process_registry.py index d981878a31..83059915e4 100644 --- a/tests/tools/test_process_registry.py +++ b/tests/tools/test_process_registry.py @@ -103,6 +103,134 @@ class TestGetAndPoll: assert result["exit_code"] == 0 +# ========================================================================= +# Orphaned-pipe reconciliation (issue #17327) +# ========================================================================= + +@pytest.mark.skipif(sys.platform == "win32", reason="POSIX-only: uses setsid/fcntl") +class TestOrphanedPipeReconciliation: + """Regression tests for issue #17327. + + `hermes update` in Feishu spawned a background subprocess that restarted + the gateway; the direct child exited quickly but a descendant daemon + held the stdout pipe open. `_reader_loop.finally` never ran, so + `session.exited` stayed False and the agent polled 74 times over 7 + minutes, all returning `status: running`. + + The fix is `_reconcile_local_exit()`: poll() and wait() now check the + direct `Popen.poll()` before trusting `session.exited`. + """ + + def test_reconcile_flips_exited_when_direct_child_done(self, registry): + """Direct child exited but reader thread is blocked on orphaned pipe.""" + # Simulate the orphaned-pipe scenario: direct child exited, but a + # descendant holds stdout open so the reader never sees EOF. + # Approach: spawn `sh -c 'sleep 10 &'` with setsid — sh forks the + # sleep into a new session group, exits immediately, but sleep + # inherits the stdout pipe and keeps it open. + proc = subprocess.Popen( + ["sh", "-c", "exec 1>&2; ( sleep 30 ) & disown; exit 0"], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + preexec_fn=os.setsid, + ) + + s = _make_session(sid="proc_orphan_test") + s.process = proc + s.pid = proc.pid + registry._running[s.id] = s + + # Wait for the direct child to exit. We don't start a reader thread, + # so session.exited stays False (mimicking the stuck-reader state). + assert _wait_until(lambda: proc.poll() is not None, timeout=5.0), ( + "Direct child should exit quickly (sh exits, sleep descendant " + "holds the pipe open)" + ) + + # Before the fix: poll would return "running" forever. + # After the fix: poll reconciles against proc.poll() and flips. + assert s.exited is False # Precondition: reader hasn't updated it. + result = registry.poll(s.id) + assert result["status"] == "exited", ( + f"Expected reconciled 'exited' status; got {result!r}. " + "This is issue #17327 — reader is blocked on orphaned pipe." + ) + assert result["exit_code"] == 0 + assert s.exited is True + assert s.id in registry._finished + assert s.id not in registry._running + + # Clean up the orphaned descendant. + try: + os.killpg(os.getpgid(proc.pid), signal.SIGKILL) + except (ProcessLookupError, PermissionError): + pass + + def test_reconcile_noop_when_child_still_running(self, registry): + """Reconcile must NOT flip exited when the direct child is alive.""" + proc = _spawn_python_sleep(5.0) + s = _make_session(sid="proc_running_test") + s.process = proc + s.pid = proc.pid + registry._running[s.id] = s + + result = registry.poll(s.id) + assert result["status"] == "running" + assert s.exited is False + + proc.kill() + proc.wait() + + def test_reconcile_noop_on_already_exited(self, registry): + """Reconcile is a no-op when session.exited is already True.""" + s = _make_session(sid="proc_already_exited", exited=True, exit_code=7) + s.process = MagicMock() + s.process.poll = MagicMock(return_value=0) # Would say exit 0 + registry._finished[s.id] = s + + registry._reconcile_local_exit(s) + # Must not overwrite the existing exit_code with proc.poll()'s 0. + assert s.exit_code == 7 + + def test_reconcile_noop_on_no_process(self, registry): + """Reconcile is a no-op for sessions without a local Popen (env/PTY).""" + s = _make_session(sid="proc_no_popen") + assert getattr(s, "process", None) is None + # Must not raise. + registry._reconcile_local_exit(s) + assert s.exited is False + + def test_wait_returns_when_reader_blocked(self, registry): + """wait() must also reconcile — not just poll().""" + proc = subprocess.Popen( + ["sh", "-c", "( sleep 30 ) & disown; exit 0"], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + preexec_fn=os.setsid, + ) + + s = _make_session(sid="proc_wait_orphan") + s.process = proc + s.pid = proc.pid + registry._running[s.id] = s + + assert _wait_until(lambda: proc.poll() is not None, timeout=5.0) + + start = time.monotonic() + result = registry.wait(s.id, timeout=10) + elapsed = time.monotonic() - start + + assert result["status"] == "exited", result + assert elapsed < 5.0, ( + f"wait() should return ~immediately via reconcile; took {elapsed:.1f}s" + ) + + try: + os.killpg(os.getpgid(proc.pid), signal.SIGKILL) + except (ProcessLookupError, PermissionError): + pass + + # ========================================================================= # Read log # ========================================================================= diff --git a/tools/process_registry.py b/tools/process_registry.py index 479030120d..da5c8d224b 100644 --- a/tools/process_registry.py +++ b/tools/process_registry.py @@ -800,6 +800,78 @@ class ProcessRegistry: session = self._running.get(session_id) or self._finished.get(session_id) return self._refresh_detached_session(session) + def _reconcile_local_exit(self, session: "ProcessSession") -> None: + """Reconcile session.exited against the real child process state. + + The reader thread (`_reader_loop`) sets `session.exited = True` only + in its `finally` block, which runs when `stdout.read()` returns EOF. + If the direct `Popen` child has exited but a descendant process (e.g. + a daemon spawned by `hermes update` restarting the gateway) is still + holding the stdout pipe open, the reader blocks forever and poll() + keeps returning "running" indefinitely (issue #17327 — 74 polls over + 7 minutes on Feishu). + + This helper closes that window: when `session.exited` is still False + but the direct child's `Popen.poll()` reports an exit code, drain any + readable bytes non-blocking and flip `session.exited`. The orphaned + reader thread remains stuck on its blocking `read()` but is a daemon + thread and will be reaped with the process. + + Safe no-op on sessions without a local `Popen` (env/PTY), already- + exited sessions, and detached-recovered sessions. + """ + if session is None or session.exited: + return + proc = getattr(session, "process", None) + if proc is None: + return + try: + rc = proc.poll() + except Exception: + return + if rc is None: + return # Direct child still running — reader block is legitimate. + + # Direct child exited. Try to drain any bytes the reader hasn't + # consumed yet. This is best-effort: if the pipe is held open by a + # descendant, the non-blocking read returns what's immediately + # available and we stop. + drained = "" + stdout = getattr(proc, "stdout", None) + if stdout is not None and not _IS_WINDOWS: + try: + import fcntl + fd = stdout.fileno() + flags = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + try: + chunk = stdout.read() + if chunk: + drained = chunk if isinstance(chunk, str) else chunk.decode("utf-8", errors="replace") + except (BlockingIOError, OSError, ValueError): + pass + finally: + try: + fcntl.fcntl(fd, fcntl.F_SETFL, flags) + except Exception: + pass + except Exception as e: + logger.debug("Non-blocking drain failed for %s: %s", session.id, e) + + with session._lock: + if drained: + session.output_buffer += drained + if len(session.output_buffer) > session.max_output_chars: + session.output_buffer = session.output_buffer[-session.max_output_chars:] + session.exited = True + session.exit_code = rc + logger.info( + "Reconciled session %s: direct child exited with code %s but reader " + "was still blocked (orphaned pipe). Flipped to exited.", + session.id, rc, + ) + self._move_to_finished(session) + def poll(self, session_id: str) -> dict: """Check status and get new output for a background process.""" from tools.ansi_strip import strip_ansi @@ -808,6 +880,10 @@ class ProcessRegistry: if session is None: return {"status": "not_found", "error": f"No process with ID {session_id}"} + # Reconcile against real child state before reading session.exited. + # Guards against orphaned-pipe reader hangs (issue #17327). + self._reconcile_local_exit(session) + with session._lock: output_preview = strip_ansi(session.output_buffer[-1000:]) if session.output_buffer else "" @@ -898,6 +974,10 @@ class ProcessRegistry: while time.monotonic() < deadline: session = self._refresh_detached_session(session) + # Reconcile against real child state — guards against orphaned- + # pipe reader hangs where the reader is blocked but the direct + # child has already exited (issue #17327). + self._reconcile_local_exit(session) if session.exited: self._completion_consumed.add(session_id) result = {