From a652131c421b314a9cb72eb3044922f0c6273d67 Mon Sep 17 00:00:00 2001 From: underthestars-zhy Date: Thu, 11 Jun 2026 02:49:11 -0700 Subject: [PATCH] fix(photon): stop gateway restarts from orphaning the sidecar on its port MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A hard gateway exit (crash, SIGKILL, supervisor restart) left the detached Node sidecar running with a token the next gateway run doesn't know, so it could never be told to /shutdown. Every replacement spawn then died on EADDRINUSE, failing each 30→300s reconnect attempt while the orphan kept consuming the inbound gRPC stream. Two layers: - Lifetime binding: the adapter now holds the sidecar's stdin as a pipe, and the sidecar (PHOTON_SIDECAR_WATCH_STDIN=1) shuts down on stdin EOF — fired by the OS on any parent death, including SIGKILL. - Startup reaping: before spawning, the adapter probes the port and terminates a stale listener, but only after verifying its command line is a Photon sidecar; a foreign listener raises a clear error instead of being signalled. Co-Authored-By: Claude Fable 5 --- plugins/platforms/photon/adapter.py | 105 +++++++++++ plugins/platforms/photon/sidecar/index.mjs | 20 ++ .../photon/test_sidecar_lifecycle.py | 171 ++++++++++++++++++ 3 files changed, 296 insertions(+) create mode 100644 tests/plugins/platforms/photon/test_sidecar_lifecycle.py diff --git a/plugins/platforms/photon/adapter.py b/plugins/platforms/photon/adapter.py index 9673d58c9eb..1b2d1bd24b9 100644 --- a/plugins/platforms/photon/adapter.py +++ b/plugins/platforms/photon/adapter.py @@ -611,21 +611,118 @@ class PhotonAdapter(BasePlatformAdapter): # -- Sidecar lifecycle ------------------------------------------------- + @staticmethod + def _find_listener_pids(port: int) -> List[int]: + """PIDs listening on a local TCP port (empty if none/undeterminable).""" + try: + out = subprocess.run( # noqa: S603, S607 + ["lsof", "-ti", f"tcp:{port}", "-sTCP:LISTEN"], + capture_output=True, text=True, timeout=5.0, check=False, + ) + except (OSError, subprocess.TimeoutExpired): + return [] + return [int(tok) for tok in out.stdout.split() if tok.strip().isdigit()] + + @staticmethod + def _pid_is_sidecar(pid: int) -> bool: + """True if ``pid``'s command line is a Photon sidecar process.""" + try: + out = subprocess.run( # noqa: S603, S607 + ["ps", "-p", str(pid), "-o", "command="], + capture_output=True, text=True, timeout=5.0, check=False, + ) + except (OSError, subprocess.TimeoutExpired): + return False + # Checkout-agnostic: any Hermes checkout's sidecar entry point. + return "photon/sidecar/index.mjs" in out.stdout + + @staticmethod + def _pid_alive(pid: int) -> bool: + try: + os.kill(pid, 0) + return True + except OSError: + return False + + async def _reap_stale_sidecar(self) -> None: + """Kill an orphaned sidecar squatting our port before spawning ours. + + A hard gateway exit (crash, SIGKILL, supervisor restart) used to leave + the detached sidecar running with a token the new gateway doesn't + know, so it can't be told to ``/shutdown`` — and every replacement + spawn died on EADDRINUSE, failing each reconnect attempt. The + stdin-EOF watch prevents new orphans; this reclaims the port from + orphans that predate it (or survived it). Listeners are verified by + command line before being signalled. + """ + if sys.platform == "win32": # lsof/ps; orphaning is a POSIX-only path + return + try: + async with httpx.AsyncClient(timeout=2.0) as client: + await client.post( + f"http://{self._sidecar_bind}:{self._sidecar_port}/healthz", + headers={"X-Hermes-Sidecar-Token": self._sidecar_token}, + ) + except httpx.RequestError: + return # nothing listening — the normal case + pids = self._find_listener_pids(self._sidecar_port) + stale = [pid for pid in pids if self._pid_is_sidecar(pid)] + foreign = [pid for pid in pids if pid not in stale] + if not stale: + raise RuntimeError( + f"port {self._sidecar_port} is in use by another process " + f"(pids: {foreign or 'unknown'}, not a Photon sidecar) — " + f"free it or set PHOTON_SIDECAR_PORT to a different port" + ) + for pid in stale: + logger.warning( + "[photon] reaping orphaned sidecar (pid %d) on port %d", + pid, self._sidecar_port, + ) + try: + os.kill(pid, signal.SIGTERM) + except OSError: + pass + deadline = time.time() + 3.0 + while time.time() < deadline and any(self._pid_alive(p) for p in stale): + await asyncio.sleep(0.1) + for pid in stale: + if self._pid_alive(pid): + try: + os.kill(pid, signal.SIGKILL) + except OSError: + pass + # Give the OS a beat to release the listening socket. + await asyncio.sleep(0.2) + if foreign: + raise RuntimeError( + f"port {self._sidecar_port} is also held by non-sidecar " + f"processes (pids: {foreign}) — free it or set " + f"PHOTON_SIDECAR_PORT to a different port" + ) + async def _start_sidecar(self) -> None: if not (_SIDECAR_DIR / "node_modules").exists(): raise RuntimeError( f"Photon sidecar deps not installed. Run: " f"cd {_SIDECAR_DIR} && npm install (or `hermes photon setup`)" ) + await self._reap_stale_sidecar() + env = os.environ.copy() env["PHOTON_PROJECT_ID"] = self._project_id env["PHOTON_PROJECT_SECRET"] = self._project_secret env["PHOTON_SIDECAR_PORT"] = str(self._sidecar_port) env["PHOTON_SIDECAR_BIND"] = self._sidecar_bind env["PHOTON_SIDECAR_TOKEN"] = self._sidecar_token + # The sidecar exits when its stdin (the pipe below) hits EOF, so a + # gateway death of ANY kind — including SIGKILL, where disconnect() + # never runs — can't leave it orphaned on the port. + env["PHOTON_SIDECAR_WATCH_STDIN"] = "1" self._sidecar_proc = subprocess.Popen( # noqa: S603 [self._node_bin, str(_SIDECAR_DIR / "index.mjs")], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env, @@ -682,6 +779,14 @@ class PhotonAdapter(BasePlatformAdapter): if proc is None: return try: + # Closing our end of the stdin pipe is itself a shutdown signal + # (the sidecar watches for EOF), and covers the case where the + # HTTP call below can't get through. + if proc.stdin is not None: + try: + proc.stdin.close() + except Exception: + pass # Polite shutdown first. if self._http_client is not None: try: diff --git a/plugins/platforms/photon/sidecar/index.mjs b/plugins/platforms/photon/sidecar/index.mjs index 91073f32b4a..0ca723764a5 100644 --- a/plugins/platforms/photon/sidecar/index.mjs +++ b/plugins/platforms/photon/sidecar/index.mjs @@ -48,6 +48,9 @@ // PHOTON_SIDECAR_TOKEN // Optional: // PHOTON_SIDECAR_BIND (default 127.0.0.1) +// PHOTON_SIDECAR_WATCH_STDIN "1" = exit when stdin hits EOF (set by the +// adapter, which holds our stdin pipe — parent-death +// detection so a dead gateway can't orphan us) // PHOTON_TELEMETRY enable Spectrum SDK telemetry ("true"/"1"/"on"/"yes"; // default off — toggle with `hermes photon telemetry`) @@ -642,7 +645,12 @@ server.listen(port, bind, () => { console.error(`photon-sidecar: listening on ${bind}:${port}`); }); +let stopping = false; async function shutdown(signal) { + // Re-entry guard: stdin EOF, a signal and /shutdown can all fire together + // during one teardown. + if (stopping) return; + stopping = true; console.error(`photon-sidecar: received ${signal}, stopping...`); try { await Promise.race([ @@ -659,6 +667,18 @@ async function shutdown(signal) { process.on("SIGINT", () => shutdown("SIGINT")); process.on("SIGTERM", () => shutdown("SIGTERM")); +// Lifetime binding to the parent. The adapter spawns us with stdin as a pipe +// it holds open; EOF means the gateway process is gone — including hard +// deaths (crash, SIGKILL) where no signal and no /shutdown ever reaches us. +// Without this, an orphaned sidecar squats the port and keeps consuming the +// inbound gRPC stream, and every replacement spawn dies on EADDRINUSE. +// Opt-in via env so manual `node index.mjs` runs aren't affected. +if (process.env.PHOTON_SIDECAR_WATCH_STDIN === "1") { + process.stdin.resume(); + process.stdin.on("end", () => shutdown("stdin EOF (parent exited)")); + process.stdin.on("error", () => shutdown("stdin error (parent exited)")); +} + // Don't let a stray promise rejection take the process down silently — handlers // catch their own errors, so log and keep serving (Python supervises restart on // a real fatal exit). diff --git a/tests/plugins/platforms/photon/test_sidecar_lifecycle.py b/tests/plugins/platforms/photon/test_sidecar_lifecycle.py new file mode 100644 index 00000000000..31b005c2488 --- /dev/null +++ b/tests/plugins/platforms/photon/test_sidecar_lifecycle.py @@ -0,0 +1,171 @@ +"""Sidecar lifecycle tests: orphan reaping and parent-death wiring. + +A hard gateway exit used to leave the detached Node sidecar squatting the +loopback port with a token the next gateway run doesn't know — every +replacement spawn then died on EADDRINUSE. These tests cover the startup +reaper (`_reap_stale_sidecar`) and the stdin-pipe lifetime binding, without +spawning Node or binding ports. +""" +from __future__ import annotations + +import subprocess +from typing import Any, Dict, List, Tuple + +import pytest + +from gateway.config import PlatformConfig +from plugins.platforms.photon import adapter as photon_adapter +from plugins.platforms.photon.adapter import PhotonAdapter + + +def _make_adapter(monkeypatch: pytest.MonkeyPatch) -> PhotonAdapter: + monkeypatch.setenv("PHOTON_PROJECT_ID", "test-project-id") + monkeypatch.setenv("PHOTON_PROJECT_SECRET", "test-project-secret") + cfg = PlatformConfig(enabled=True, token="", extra={}) + return PhotonAdapter(cfg) + + +class _ProbeClient: + """Fake httpx.AsyncClient whose /healthz probe behavior is injectable.""" + + connects = True + + def __init__(self, *a: Any, **k: Any) -> None: + pass + + async def __aenter__(self) -> "_ProbeClient": + return self + + async def __aexit__(self, *a: Any) -> bool: + return False + + async def post(self, *a: Any, **k: Any) -> Any: + if not self.connects: + raise photon_adapter.httpx.ConnectError("connection refused") + + class _Resp: + status_code = 401 # orphan with a different token + + return _Resp() + + +def _capture_kills(monkeypatch: pytest.MonkeyPatch) -> List[Tuple[int, int]]: + kills: List[Tuple[int, int]] = [] + + def _fake_kill(pid: int, sig: int) -> None: + kills.append((pid, sig)) + + monkeypatch.setattr(photon_adapter.os, "kill", _fake_kill) + return kills + + +@pytest.mark.asyncio +async def test_reap_noop_when_port_free(monkeypatch: pytest.MonkeyPatch) -> None: + adapter = _make_adapter(monkeypatch) + + class _Refused(_ProbeClient): + connects = False + + monkeypatch.setattr(photon_adapter.httpx, "AsyncClient", _Refused) + kills = _capture_kills(monkeypatch) + + await adapter._reap_stale_sidecar() + + assert kills == [] + + +@pytest.mark.asyncio +async def test_reap_kills_verified_orphan(monkeypatch: pytest.MonkeyPatch) -> None: + adapter = _make_adapter(monkeypatch) + monkeypatch.setattr(photon_adapter.httpx, "AsyncClient", _ProbeClient) + monkeypatch.setattr(adapter, "_find_listener_pids", lambda port: [4242]) + monkeypatch.setattr(adapter, "_pid_is_sidecar", lambda pid: True) + # Dies promptly on SIGTERM — no escalation expected. + monkeypatch.setattr(adapter, "_pid_alive", lambda pid: False) + kills = _capture_kills(monkeypatch) + + await adapter._reap_stale_sidecar() + + assert kills == [(4242, photon_adapter.signal.SIGTERM)] + + +@pytest.mark.asyncio +async def test_reap_escalates_to_sigkill(monkeypatch: pytest.MonkeyPatch) -> None: + adapter = _make_adapter(monkeypatch) + monkeypatch.setattr(photon_adapter.httpx, "AsyncClient", _ProbeClient) + monkeypatch.setattr(adapter, "_find_listener_pids", lambda port: [4242]) + monkeypatch.setattr(adapter, "_pid_is_sidecar", lambda pid: True) + monkeypatch.setattr(adapter, "_pid_alive", lambda pid: True) # ignores TERM + # No clock fakery (logging also calls time.time, which makes a fake clock + # fragile) — this test rides out the real 3s SIGTERM grace window. + kills = _capture_kills(monkeypatch) + + await adapter._reap_stale_sidecar() + + assert (4242, photon_adapter.signal.SIGTERM) in kills + assert (4242, photon_adapter.signal.SIGKILL) in kills + + +@pytest.mark.asyncio +async def test_reap_raises_for_foreign_listener( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Never signal a process whose command line isn't our sidecar.""" + adapter = _make_adapter(monkeypatch) + monkeypatch.setattr(photon_adapter.httpx, "AsyncClient", _ProbeClient) + monkeypatch.setattr(adapter, "_find_listener_pids", lambda port: [777]) + monkeypatch.setattr(adapter, "_pid_is_sidecar", lambda pid: False) + kills = _capture_kills(monkeypatch) + + with pytest.raises(RuntimeError, match="in use by another process"): + await adapter._reap_stale_sidecar() + + assert kills == [] + + +@pytest.mark.asyncio +async def test_start_sidecar_spawns_with_stdin_pipe( + monkeypatch: pytest.MonkeyPatch, tmp_path +) -> None: + """The spawn must hold a stdin pipe and enable the sidecar's EOF watch.""" + adapter = _make_adapter(monkeypatch) + + async def _no_reap() -> None: + pass + + monkeypatch.setattr(adapter, "_reap_stale_sidecar", _no_reap) + (tmp_path / "node_modules").mkdir() + monkeypatch.setattr(photon_adapter, "_SIDECAR_DIR", tmp_path) + + spawned: Dict[str, Any] = {} + + class _FakeProc: + pid = 999 + stdout = None + stdin = None + + @staticmethod + def poll() -> None: + return None + + def _fake_popen(cmd: List[str], **kwargs: Any) -> _FakeProc: + spawned["cmd"] = cmd + spawned["kwargs"] = kwargs + return _FakeProc() + + monkeypatch.setattr(photon_adapter.subprocess, "Popen", _fake_popen) + + class _HealthyClient(_ProbeClient): + async def post(self, *a: Any, **k: Any) -> Any: + class _Resp: + status_code = 200 + + return _Resp() + + monkeypatch.setattr(photon_adapter.httpx, "AsyncClient", _HealthyClient) + + await adapter._start_sidecar() + + kwargs = spawned["kwargs"] + assert kwargs["stdin"] is subprocess.PIPE + assert kwargs["env"]["PHOTON_SIDECAR_WATCH_STDIN"] == "1"