diff --git a/tests/test_slash_worker_watchdog.py b/tests/test_slash_worker_watchdog.py new file mode 100644 index 00000000000..198524c522d --- /dev/null +++ b/tests/test_slash_worker_watchdog.py @@ -0,0 +1,21 @@ +import psutil + +from tui_gateway import slash_worker + + +def test_is_orphaned_true_when_ppid_changes(): + # Our parent went away and we were reparented to a subreaper/init. + assert slash_worker._is_orphaned(1234, 1.0, getppid=lambda: 999999) is True + + +def test_is_orphaned_true_when_parent_create_time_mismatch(): + # Same ppid but a different create_time means the PID was reused. + me = psutil.Process() + assert slash_worker._is_orphaned(me.pid, 0.0, getppid=lambda: me.pid) is True + + +def test_is_orphaned_false_when_parent_alive_and_matches(): + me = psutil.Process() + assert ( + slash_worker._is_orphaned(me.pid, me.create_time(), getppid=lambda: me.pid) is False + ) diff --git a/tui_gateway/slash_worker.py b/tui_gateway/slash_worker.py index 631b0c70450..49436d57449 100644 --- a/tui_gateway/slash_worker.py +++ b/tui_gateway/slash_worker.py @@ -9,11 +9,46 @@ import io import json import os import sys +import threading +import time + +import psutil import cli as cli_mod from cli import HermesCLI from rich.console import Console +# Env-overridable so the integration test can drive sub-second timing. +_WATCHDOG_POLL_S = float(os.environ.get("HERMES_SLASH_WATCHDOG_POLL_S", 2.0)) +_ORPHAN_GRACE_S = float(os.environ.get("HERMES_SLASH_WATCHDOG_GRACE_S", 5.0)) +_in_flight = threading.Event() # set while a command is executing + + +def _is_orphaned(original_ppid, parent_create_time, getppid=os.getppid) -> bool: + """True once our spawning gateway is gone. Compare to the ORIGINAL ppid + (never ==1: Linux reparents to a subreaper) and guard PID reuse via + create_time.""" + if getppid() != original_ppid: + return True + try: + if not psutil.pid_exists(original_ppid): + return True + return psutil.Process(original_ppid).create_time() != parent_create_time + except psutil.Error: + return True + + +def _start_parent_death_watchdog(original_ppid, parent_create_time) -> None: + def _loop(): + while not _is_orphaned(original_ppid, parent_create_time): + time.sleep(_WATCHDOG_POLL_S) + deadline = time.monotonic() + _ORPHAN_GRACE_S + while _in_flight.is_set() and time.monotonic() < deadline: + time.sleep(0.05) # let an in-flight command finish/flush + os._exit(0) + + threading.Thread(target=_loop, daemon=True).start() + def _run(cli: HermesCLI, command: str) -> str: cmd = (command or "").strip() @@ -52,6 +87,15 @@ def main(): os.environ["HERMES_SESSION_KEY"] = args.session_key os.environ["HERMES_INTERACTIVE"] = "1" + # Start before the (hundreds-of-ms) HermesCLI build — that window is itself + # an orphan risk if the gateway dies mid-spawn. + orig_ppid = os.getppid() + try: + parent_create_time = psutil.Process(orig_ppid).create_time() + except psutil.Error: + parent_create_time = 0.0 + _start_parent_death_watchdog(orig_ppid, parent_create_time) + with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()): cli = HermesCLI(model=args.model or None, compact=True, resume=args.session_key, verbose=False) @@ -60,6 +104,7 @@ def main(): if not line: continue + _in_flight.set() rid = None try: req = json.loads(line) @@ -70,6 +115,8 @@ def main(): except Exception as e: sys.stdout.write(json.dumps({"id": rid, "ok": False, "error": str(e)}) + "\n") sys.stdout.flush() + finally: + _in_flight.clear() if __name__ == "__main__":