diff --git a/tests/tui_gateway/test_protocol.py b/tests/tui_gateway/test_protocol.py index 42caaacc58..6c94ec0710 100644 --- a/tests/tui_gateway/test_protocol.py +++ b/tests/tui_gateway/test_protocol.py @@ -83,6 +83,134 @@ def test_write_json_broken_pipe(server): assert server.write_json({"x": 1}) is False +def test_write_json_closed_stream_returns_false(server): + """ValueError ('I/O on closed file') used to bubble up; treat as gone.""" + + class _Closed: + def write(self, _): raise ValueError("I/O operation on closed file") + def flush(self): raise ValueError("I/O operation on closed file") + + server._real_stdout = _Closed() + assert server.write_json({"x": 1}) is False + + +def test_write_json_unicode_encode_error_re_raises(server): + """A non-UTF-8 stdout encoding raises UnicodeEncodeError (a ValueError + subclass). It must NOT be swallowed as 'peer gone' — that would let + `entry.py` exit cleanly via the False path and hide the real config + bug. We re-raise so the existing crash-log infrastructure records it.""" + + class _AsciiOnly: + def write(self, line): + line.encode("ascii") # raises UnicodeEncodeError on non-ascii + def flush(self): pass + + server._real_stdout = _AsciiOnly() + with pytest.raises(UnicodeEncodeError): + server.write_json({"msg": "héllo"}) + + +def test_write_json_unrelated_value_error_re_raises(server): + """Only ValueError('...closed file...') means peer gone. Other + ValueErrors are programming errors and must surface.""" + + class _BadValue: + def write(self, _): raise ValueError("something else entirely") + def flush(self): pass + + server._real_stdout = _BadValue() + with pytest.raises(ValueError, match="something else entirely"): + server.write_json({"x": 1}) + + +def test_write_json_non_serializable_payload_re_raises(server): + """Non-JSON-safe payloads are programming errors — they must NOT be + silently dropped via the False path (which would trigger a clean exit + in entry.py and mask the real bug).""" + import io + + server._real_stdout = io.StringIO() + with pytest.raises(TypeError): + server.write_json({"obj": object()}) + + +def test_write_json_peer_gone_oserror_on_flush_returns_false(server): + """A flush that raises a peer-gone OSError (EPIPE) must not strand + the lock or crash; it returns False so the dispatcher exits cleanly.""" + import errno + + written = [] + + class _FlushPeerGone: + def write(self, line): written.append(line) + def flush(self): raise OSError(errno.EPIPE, "broken pipe") + + server._real_stdout = _FlushPeerGone() + assert server.write_json({"x": 1}) is False + assert written and json.loads(written[0]) == {"x": 1} + + +def test_write_json_non_peer_gone_oserror_re_raises(server): + """Host I/O failures (ENOSPC, EACCES, EIO …) are NOT peer-gone — they + must re-raise so the crash log records them instead of looking like + a clean disconnect via the False path.""" + import errno + + class _DiskFull: + def write(self, _): raise OSError(errno.ENOSPC, "no space left") + def flush(self): pass + + server._real_stdout = _DiskFull() + with pytest.raises(OSError, match="no space"): + server.write_json({"x": 1}) + + +def test_write_json_skips_flush_when_disable_flush_true(monkeypatch): + """`StdioTransport` skips flush when `_DISABLE_FLUSH` is true. + + Tests the runtime *behaviour* via direct module-attr patch. The env + var → module constant wiring is covered by the dedicated env test + below; reloading server.py here would re-register atexit hooks and + recreate the worker pool. + """ + import importlib + + transport_mod = importlib.import_module("tui_gateway.transport") + monkeypatch.setattr(transport_mod, "_DISABLE_FLUSH", True) + + flushed = {"count": 0} + written = [] + + class _Stream: + def write(self, line): written.append(line) + def flush(self): flushed["count"] += 1 + + stream = _Stream() + transport = transport_mod.StdioTransport(lambda: stream, threading.Lock()) + + assert transport.write({"x": 1}) is True + assert flushed["count"] == 0 + + +def test_disable_flush_env_var_actually_wires_to_module_constant(monkeypatch): + """End-to-end: setting `HERMES_TUI_GATEWAY_NO_FLUSH=1` and importing + `tui_gateway.transport` fresh actually flips `_DISABLE_FLUSH` true. + + Reloads only the transport module — server.py is untouched so its + atexit hooks/worker pool stay intact.""" + import importlib + + monkeypatch.setenv("HERMES_TUI_GATEWAY_NO_FLUSH", "1") + transport_mod = importlib.reload(importlib.import_module("tui_gateway.transport")) + + try: + assert transport_mod._DISABLE_FLUSH is True + finally: + # Restore the env-disabled state so other tests see the default. + monkeypatch.delenv("HERMES_TUI_GATEWAY_NO_FLUSH", raising=False) + importlib.reload(transport_mod) + + # ── _emit ──────────────────────────────────────────────────────────── diff --git a/tui_gateway/entry.py b/tui_gateway/entry.py index 38e00ecfac..70fc851820 100644 --- a/tui_gateway/entry.py +++ b/tui_gateway/entry.py @@ -29,6 +29,28 @@ def _install_sidecar_publisher() -> None: ) +# How long to wait for orderly shutdown (atexit + finalisers) before +# falling back to ``os._exit(0)`` so a wedged worker mid-flush can't +# strand the process. 1s covers the gateway's own shutdown work +# (thread-pool drain + session finalize) on every machine we've +# tested; override via ``HERMES_TUI_GATEWAY_SHUTDOWN_GRACE_S`` if a +# slower environment needs more headroom (e.g. encrypted disks +# flushing checkpoints) and accept that a longer grace also means a +# longer wait when shutdown actually deadlocks. +_DEFAULT_SHUTDOWN_GRACE_S = 1.0 + + +def _shutdown_grace_seconds() -> float: + raw = (os.environ.get("HERMES_TUI_GATEWAY_SHUTDOWN_GRACE_S") or "").strip() + if not raw: + return _DEFAULT_SHUTDOWN_GRACE_S + try: + value = float(raw) + except ValueError: + return _DEFAULT_SHUTDOWN_GRACE_S + return value if value > 0 else _DEFAULT_SHUTDOWN_GRACE_S + + def _log_signal(signum: int, frame) -> None: """Capture WHICH thread and WHERE a termination signal hit us. @@ -38,6 +60,15 @@ def _log_signal(signum: int, frame) -> None: handler the gateway-exited banner in the TUI has no trace — the crash log never sees a Python exception because the kernel reaps the process before the interpreter runs anything. + + Termination semantics: ``sys.exit(0)`` here used to race the worker + pool — a thread holding ``_stdout_lock`` mid-flush would block the + interpreter shutdown indefinitely. We now log the stack, give the + process the configured shutdown grace + (``HERMES_TUI_GATEWAY_SHUTDOWN_GRACE_S``, default + ``_DEFAULT_SHUTDOWN_GRACE_S``) to drain naturally on a background + thread, and fall back to ``os._exit(0)`` so a wedged write/flush + can never strand the process. """ name = { signal.SIGPIPE: "SIGPIPE", @@ -62,7 +93,31 @@ def _log_signal(signum: int, frame) -> None: except Exception: pass print(f"[gateway-signal] {name}", file=sys.stderr, flush=True) - sys.exit(0) + + import threading as _threading + + def _hard_exit() -> None: + # If a worker thread is still mid-flush on a half-closed pipe, + # ``sys.exit(0)`` would wait forever for it to drop the GIL on + # interpreter shutdown. ``os._exit`` skips atexit handlers but + # breaks the deadlock. The crash log + stderr line above are + # the forensic trail. + os._exit(0) + + timer = _threading.Timer(_shutdown_grace_seconds(), _hard_exit) + timer.daemon = True + timer.start() + + try: + sys.exit(0) + except SystemExit: + # Re-raise so the main-thread interpreter unwinds and runs + # atexit + finalisers inside the grace window. Python signal + # handlers always run on the main thread, but a worker thread + # holding ``_stdout_lock`` mid-flush can keep that unwind + # waiting indefinitely; the daemon timer above is the safety + # net for that exact case. + raise # SIGPIPE: ignore, don't exit. The old SIG_DFL killed the process diff --git a/tui_gateway/transport.py b/tui_gateway/transport.py index a1b4b283db..ce93e518a3 100644 --- a/tui_gateway/transport.py +++ b/tui_gateway/transport.py @@ -23,10 +23,45 @@ the stream lazily through a callback. from __future__ import annotations import contextvars +import errno import json +import logging +import os import threading from typing import Any, Callable, Optional, Protocol, runtime_checkable +# Errno values that mean "the peer is gone" rather than "the host has a +# real I/O problem". Anything outside this set re-raises so it surfaces +# in the crash log instead of looking like a clean disconnect. +_PEER_GONE_ERRNOS = frozenset({ + errno.EPIPE, # write to closed pipe (POSIX) + errno.ECONNRESET, # peer reset the connection + errno.EBADF, # fd closed under us + errno.ESHUTDOWN, # transport endpoint shut down + getattr(errno, "WSAECONNRESET", -1), # win32 mapping (no-op on POSIX) + getattr(errno, "WSAESHUTDOWN", -1), +} - {-1}) + +logger = logging.getLogger(__name__) + +# Optional knob: when true, StdioTransport does not call ``stream.flush`` +# after writing. Use this on environments where a half-closed pipe (TUI +# Node parent quit while the gateway is still emitting events) makes +# flush block long enough to starve the rest of the worker pool. +# +# IMPORTANT: Python text stdout is fully buffered when attached to a +# pipe (the TUI case), so this knob ONLY makes sense when the gateway +# is launched with ``-u`` or ``PYTHONUNBUFFERED=1``. Without one of +# those, JSON-RPC frames will accumulate in the buffer and the TUI +# will hang waiting for ``gateway.ready``. Default stays off so the +# existing flush-after-write behaviour is unchanged. +_DISABLE_FLUSH = (os.environ.get("HERMES_TUI_GATEWAY_NO_FLUSH", "") or "").strip().lower() in { + "1", + "true", + "yes", + "on", +} + @runtime_checkable class Transport(Protocol): @@ -77,15 +112,72 @@ class StdioTransport: self._lock = lock def write(self, obj: dict) -> bool: + """Return ``True`` on success, ``False`` ONLY when the peer is gone. + + Returning ``False`` is the dispatcher's "broken stdout pipe" signal + — ``entry.py`` calls ``sys.exit(0)`` when ``write_json`` reports + ``False``. So programming errors (non-JSON-safe payloads, encoding + misconfig, unexpected ValueErrors, host I/O bugs like ENOSPC) MUST + NOT return ``False``, otherwise a real bug looks like a clean + disconnect and is harder to diagnose. Those re-raise so the + existing crash-log infrastructure records the traceback. + + Peer-gone branches: + * ``BrokenPipeError`` + * ``ValueError("...closed file...")`` + * ``OSError`` whose errno is in :data:`_PEER_GONE_ERRNOS` + (EPIPE / ECONNRESET / EBADF / ESHUTDOWN; plus WSA mappings + on Windows). Other OSError errnos (ENOSPC, EACCES, ...) are + real host problems and re-raise. + """ + # Serialization is OUTSIDE the lock so a large payload can't + # block other threads emitting their own frames. A non-JSON-safe + # payload is a programming error: re-raise so the crash log + # captures it instead of silently exiting via the False path. line = json.dumps(obj, ensure_ascii=False) + "\n" - try: - with self._lock: - stream = self._stream_getter() + + with self._lock: + stream = self._stream_getter() + try: stream.write(line) - stream.flush() - return True - except BrokenPipeError: - return False + except BrokenPipeError: + return False + except ValueError as e: + # ValueError("I/O operation on closed file") is the + # ONLY ValueError that means "peer gone". Anything + # else — including UnicodeEncodeError, which is a + # ValueError subclass for misconfigured locales — + # is a real bug; re-raise so it surfaces in the crash log. + if isinstance(e, UnicodeEncodeError) or "closed file" not in str(e): + raise + return False + except OSError as e: + if e.errno not in _PEER_GONE_ERRNOS: + raise + logger.debug("StdioTransport write peer gone: %s", e) + return False + + # A flush that *raises* with a peer-gone errno means the + # dispatcher should exit cleanly. A flush that *hangs* on + # a half-closed pipe holds the lock until it returns — see + # ``_DISABLE_FLUSH`` for the "skip flush entirely" escape + # hatch. + if not _DISABLE_FLUSH: + try: + stream.flush() + except BrokenPipeError: + return False + except ValueError as e: + if isinstance(e, UnicodeEncodeError) or "closed file" not in str(e): + raise + return False + except OSError as e: + if e.errno not in _PEER_GONE_ERRNOS: + raise + logger.debug("StdioTransport flush peer gone: %s", e) + return False + + return True def close(self) -> None: return None