mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-13 09:01:54 +00:00
fix(photon): stop gateway restarts from orphaning the sidecar on its port
A hard gateway exit (crash, SIGKILL, supervisor restart) left the detached Node sidecar running with a token the next gateway run doesn't know, so it could never be told to /shutdown. Every replacement spawn then died on EADDRINUSE, failing each 30→300s reconnect attempt while the orphan kept consuming the inbound gRPC stream. Two layers: - Lifetime binding: the adapter now holds the sidecar's stdin as a pipe, and the sidecar (PHOTON_SIDECAR_WATCH_STDIN=1) shuts down on stdin EOF — fired by the OS on any parent death, including SIGKILL. - Startup reaping: before spawning, the adapter probes the port and terminates a stale listener, but only after verifying its command line is a Photon sidecar; a foreign listener raises a clear error instead of being signalled. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
parent
573c4e6511
commit
a652131c42
3 changed files with 296 additions and 0 deletions
|
|
@ -611,21 +611,118 @@ class PhotonAdapter(BasePlatformAdapter):
|
|||
|
||||
# -- Sidecar lifecycle -------------------------------------------------
|
||||
|
||||
@staticmethod
|
||||
def _find_listener_pids(port: int) -> List[int]:
|
||||
"""PIDs listening on a local TCP port (empty if none/undeterminable)."""
|
||||
try:
|
||||
out = subprocess.run( # noqa: S603, S607
|
||||
["lsof", "-ti", f"tcp:{port}", "-sTCP:LISTEN"],
|
||||
capture_output=True, text=True, timeout=5.0, check=False,
|
||||
)
|
||||
except (OSError, subprocess.TimeoutExpired):
|
||||
return []
|
||||
return [int(tok) for tok in out.stdout.split() if tok.strip().isdigit()]
|
||||
|
||||
@staticmethod
|
||||
def _pid_is_sidecar(pid: int) -> bool:
|
||||
"""True if ``pid``'s command line is a Photon sidecar process."""
|
||||
try:
|
||||
out = subprocess.run( # noqa: S603, S607
|
||||
["ps", "-p", str(pid), "-o", "command="],
|
||||
capture_output=True, text=True, timeout=5.0, check=False,
|
||||
)
|
||||
except (OSError, subprocess.TimeoutExpired):
|
||||
return False
|
||||
# Checkout-agnostic: any Hermes checkout's sidecar entry point.
|
||||
return "photon/sidecar/index.mjs" in out.stdout
|
||||
|
||||
@staticmethod
|
||||
def _pid_alive(pid: int) -> bool:
|
||||
try:
|
||||
os.kill(pid, 0)
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
async def _reap_stale_sidecar(self) -> None:
|
||||
"""Kill an orphaned sidecar squatting our port before spawning ours.
|
||||
|
||||
A hard gateway exit (crash, SIGKILL, supervisor restart) used to leave
|
||||
the detached sidecar running with a token the new gateway doesn't
|
||||
know, so it can't be told to ``/shutdown`` — and every replacement
|
||||
spawn died on EADDRINUSE, failing each reconnect attempt. The
|
||||
stdin-EOF watch prevents new orphans; this reclaims the port from
|
||||
orphans that predate it (or survived it). Listeners are verified by
|
||||
command line before being signalled.
|
||||
"""
|
||||
if sys.platform == "win32": # lsof/ps; orphaning is a POSIX-only path
|
||||
return
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=2.0) as client:
|
||||
await client.post(
|
||||
f"http://{self._sidecar_bind}:{self._sidecar_port}/healthz",
|
||||
headers={"X-Hermes-Sidecar-Token": self._sidecar_token},
|
||||
)
|
||||
except httpx.RequestError:
|
||||
return # nothing listening — the normal case
|
||||
pids = self._find_listener_pids(self._sidecar_port)
|
||||
stale = [pid for pid in pids if self._pid_is_sidecar(pid)]
|
||||
foreign = [pid for pid in pids if pid not in stale]
|
||||
if not stale:
|
||||
raise RuntimeError(
|
||||
f"port {self._sidecar_port} is in use by another process "
|
||||
f"(pids: {foreign or 'unknown'}, not a Photon sidecar) — "
|
||||
f"free it or set PHOTON_SIDECAR_PORT to a different port"
|
||||
)
|
||||
for pid in stale:
|
||||
logger.warning(
|
||||
"[photon] reaping orphaned sidecar (pid %d) on port %d",
|
||||
pid, self._sidecar_port,
|
||||
)
|
||||
try:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
except OSError:
|
||||
pass
|
||||
deadline = time.time() + 3.0
|
||||
while time.time() < deadline and any(self._pid_alive(p) for p in stale):
|
||||
await asyncio.sleep(0.1)
|
||||
for pid in stale:
|
||||
if self._pid_alive(pid):
|
||||
try:
|
||||
os.kill(pid, signal.SIGKILL)
|
||||
except OSError:
|
||||
pass
|
||||
# Give the OS a beat to release the listening socket.
|
||||
await asyncio.sleep(0.2)
|
||||
if foreign:
|
||||
raise RuntimeError(
|
||||
f"port {self._sidecar_port} is also held by non-sidecar "
|
||||
f"processes (pids: {foreign}) — free it or set "
|
||||
f"PHOTON_SIDECAR_PORT to a different port"
|
||||
)
|
||||
|
||||
async def _start_sidecar(self) -> None:
|
||||
if not (_SIDECAR_DIR / "node_modules").exists():
|
||||
raise RuntimeError(
|
||||
f"Photon sidecar deps not installed. Run: "
|
||||
f"cd {_SIDECAR_DIR} && npm install (or `hermes photon setup`)"
|
||||
)
|
||||
await self._reap_stale_sidecar()
|
||||
|
||||
env = os.environ.copy()
|
||||
env["PHOTON_PROJECT_ID"] = self._project_id
|
||||
env["PHOTON_PROJECT_SECRET"] = self._project_secret
|
||||
env["PHOTON_SIDECAR_PORT"] = str(self._sidecar_port)
|
||||
env["PHOTON_SIDECAR_BIND"] = self._sidecar_bind
|
||||
env["PHOTON_SIDECAR_TOKEN"] = self._sidecar_token
|
||||
# The sidecar exits when its stdin (the pipe below) hits EOF, so a
|
||||
# gateway death of ANY kind — including SIGKILL, where disconnect()
|
||||
# never runs — can't leave it orphaned on the port.
|
||||
env["PHOTON_SIDECAR_WATCH_STDIN"] = "1"
|
||||
|
||||
self._sidecar_proc = subprocess.Popen( # noqa: S603
|
||||
[self._node_bin, str(_SIDECAR_DIR / "index.mjs")],
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
env=env,
|
||||
|
|
@ -682,6 +779,14 @@ class PhotonAdapter(BasePlatformAdapter):
|
|||
if proc is None:
|
||||
return
|
||||
try:
|
||||
# Closing our end of the stdin pipe is itself a shutdown signal
|
||||
# (the sidecar watches for EOF), and covers the case where the
|
||||
# HTTP call below can't get through.
|
||||
if proc.stdin is not None:
|
||||
try:
|
||||
proc.stdin.close()
|
||||
except Exception:
|
||||
pass
|
||||
# Polite shutdown first.
|
||||
if self._http_client is not None:
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -48,6 +48,9 @@
|
|||
// PHOTON_SIDECAR_TOKEN
|
||||
// Optional:
|
||||
// PHOTON_SIDECAR_BIND (default 127.0.0.1)
|
||||
// PHOTON_SIDECAR_WATCH_STDIN "1" = exit when stdin hits EOF (set by the
|
||||
// adapter, which holds our stdin pipe — parent-death
|
||||
// detection so a dead gateway can't orphan us)
|
||||
// PHOTON_TELEMETRY enable Spectrum SDK telemetry ("true"/"1"/"on"/"yes";
|
||||
// default off — toggle with `hermes photon telemetry`)
|
||||
|
||||
|
|
@ -642,7 +645,12 @@ server.listen(port, bind, () => {
|
|||
console.error(`photon-sidecar: listening on ${bind}:${port}`);
|
||||
});
|
||||
|
||||
let stopping = false;
|
||||
async function shutdown(signal) {
|
||||
// Re-entry guard: stdin EOF, a signal and /shutdown can all fire together
|
||||
// during one teardown.
|
||||
if (stopping) return;
|
||||
stopping = true;
|
||||
console.error(`photon-sidecar: received ${signal}, stopping...`);
|
||||
try {
|
||||
await Promise.race([
|
||||
|
|
@ -659,6 +667,18 @@ async function shutdown(signal) {
|
|||
process.on("SIGINT", () => shutdown("SIGINT"));
|
||||
process.on("SIGTERM", () => shutdown("SIGTERM"));
|
||||
|
||||
// Lifetime binding to the parent. The adapter spawns us with stdin as a pipe
|
||||
// it holds open; EOF means the gateway process is gone — including hard
|
||||
// deaths (crash, SIGKILL) where no signal and no /shutdown ever reaches us.
|
||||
// Without this, an orphaned sidecar squats the port and keeps consuming the
|
||||
// inbound gRPC stream, and every replacement spawn dies on EADDRINUSE.
|
||||
// Opt-in via env so manual `node index.mjs` runs aren't affected.
|
||||
if (process.env.PHOTON_SIDECAR_WATCH_STDIN === "1") {
|
||||
process.stdin.resume();
|
||||
process.stdin.on("end", () => shutdown("stdin EOF (parent exited)"));
|
||||
process.stdin.on("error", () => shutdown("stdin error (parent exited)"));
|
||||
}
|
||||
|
||||
// Don't let a stray promise rejection take the process down silently — handlers
|
||||
// catch their own errors, so log and keep serving (Python supervises restart on
|
||||
// a real fatal exit).
|
||||
|
|
|
|||
171
tests/plugins/platforms/photon/test_sidecar_lifecycle.py
Normal file
171
tests/plugins/platforms/photon/test_sidecar_lifecycle.py
Normal file
|
|
@ -0,0 +1,171 @@
|
|||
"""Sidecar lifecycle tests: orphan reaping and parent-death wiring.
|
||||
|
||||
A hard gateway exit used to leave the detached Node sidecar squatting the
|
||||
loopback port with a token the next gateway run doesn't know — every
|
||||
replacement spawn then died on EADDRINUSE. These tests cover the startup
|
||||
reaper (`_reap_stale_sidecar`) and the stdin-pipe lifetime binding, without
|
||||
spawning Node or binding ports.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import subprocess
|
||||
from typing import Any, Dict, List, Tuple
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import PlatformConfig
|
||||
from plugins.platforms.photon import adapter as photon_adapter
|
||||
from plugins.platforms.photon.adapter import PhotonAdapter
|
||||
|
||||
|
||||
def _make_adapter(monkeypatch: pytest.MonkeyPatch) -> PhotonAdapter:
|
||||
monkeypatch.setenv("PHOTON_PROJECT_ID", "test-project-id")
|
||||
monkeypatch.setenv("PHOTON_PROJECT_SECRET", "test-project-secret")
|
||||
cfg = PlatformConfig(enabled=True, token="", extra={})
|
||||
return PhotonAdapter(cfg)
|
||||
|
||||
|
||||
class _ProbeClient:
|
||||
"""Fake httpx.AsyncClient whose /healthz probe behavior is injectable."""
|
||||
|
||||
connects = True
|
||||
|
||||
def __init__(self, *a: Any, **k: Any) -> None:
|
||||
pass
|
||||
|
||||
async def __aenter__(self) -> "_ProbeClient":
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *a: Any) -> bool:
|
||||
return False
|
||||
|
||||
async def post(self, *a: Any, **k: Any) -> Any:
|
||||
if not self.connects:
|
||||
raise photon_adapter.httpx.ConnectError("connection refused")
|
||||
|
||||
class _Resp:
|
||||
status_code = 401 # orphan with a different token
|
||||
|
||||
return _Resp()
|
||||
|
||||
|
||||
def _capture_kills(monkeypatch: pytest.MonkeyPatch) -> List[Tuple[int, int]]:
|
||||
kills: List[Tuple[int, int]] = []
|
||||
|
||||
def _fake_kill(pid: int, sig: int) -> None:
|
||||
kills.append((pid, sig))
|
||||
|
||||
monkeypatch.setattr(photon_adapter.os, "kill", _fake_kill)
|
||||
return kills
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reap_noop_when_port_free(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
adapter = _make_adapter(monkeypatch)
|
||||
|
||||
class _Refused(_ProbeClient):
|
||||
connects = False
|
||||
|
||||
monkeypatch.setattr(photon_adapter.httpx, "AsyncClient", _Refused)
|
||||
kills = _capture_kills(monkeypatch)
|
||||
|
||||
await adapter._reap_stale_sidecar()
|
||||
|
||||
assert kills == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reap_kills_verified_orphan(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
adapter = _make_adapter(monkeypatch)
|
||||
monkeypatch.setattr(photon_adapter.httpx, "AsyncClient", _ProbeClient)
|
||||
monkeypatch.setattr(adapter, "_find_listener_pids", lambda port: [4242])
|
||||
monkeypatch.setattr(adapter, "_pid_is_sidecar", lambda pid: True)
|
||||
# Dies promptly on SIGTERM — no escalation expected.
|
||||
monkeypatch.setattr(adapter, "_pid_alive", lambda pid: False)
|
||||
kills = _capture_kills(monkeypatch)
|
||||
|
||||
await adapter._reap_stale_sidecar()
|
||||
|
||||
assert kills == [(4242, photon_adapter.signal.SIGTERM)]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reap_escalates_to_sigkill(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
adapter = _make_adapter(monkeypatch)
|
||||
monkeypatch.setattr(photon_adapter.httpx, "AsyncClient", _ProbeClient)
|
||||
monkeypatch.setattr(adapter, "_find_listener_pids", lambda port: [4242])
|
||||
monkeypatch.setattr(adapter, "_pid_is_sidecar", lambda pid: True)
|
||||
monkeypatch.setattr(adapter, "_pid_alive", lambda pid: True) # ignores TERM
|
||||
# No clock fakery (logging also calls time.time, which makes a fake clock
|
||||
# fragile) — this test rides out the real 3s SIGTERM grace window.
|
||||
kills = _capture_kills(monkeypatch)
|
||||
|
||||
await adapter._reap_stale_sidecar()
|
||||
|
||||
assert (4242, photon_adapter.signal.SIGTERM) in kills
|
||||
assert (4242, photon_adapter.signal.SIGKILL) in kills
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reap_raises_for_foreign_listener(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
"""Never signal a process whose command line isn't our sidecar."""
|
||||
adapter = _make_adapter(monkeypatch)
|
||||
monkeypatch.setattr(photon_adapter.httpx, "AsyncClient", _ProbeClient)
|
||||
monkeypatch.setattr(adapter, "_find_listener_pids", lambda port: [777])
|
||||
monkeypatch.setattr(adapter, "_pid_is_sidecar", lambda pid: False)
|
||||
kills = _capture_kills(monkeypatch)
|
||||
|
||||
with pytest.raises(RuntimeError, match="in use by another process"):
|
||||
await adapter._reap_stale_sidecar()
|
||||
|
||||
assert kills == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_sidecar_spawns_with_stdin_pipe(
|
||||
monkeypatch: pytest.MonkeyPatch, tmp_path
|
||||
) -> None:
|
||||
"""The spawn must hold a stdin pipe and enable the sidecar's EOF watch."""
|
||||
adapter = _make_adapter(monkeypatch)
|
||||
|
||||
async def _no_reap() -> None:
|
||||
pass
|
||||
|
||||
monkeypatch.setattr(adapter, "_reap_stale_sidecar", _no_reap)
|
||||
(tmp_path / "node_modules").mkdir()
|
||||
monkeypatch.setattr(photon_adapter, "_SIDECAR_DIR", tmp_path)
|
||||
|
||||
spawned: Dict[str, Any] = {}
|
||||
|
||||
class _FakeProc:
|
||||
pid = 999
|
||||
stdout = None
|
||||
stdin = None
|
||||
|
||||
@staticmethod
|
||||
def poll() -> None:
|
||||
return None
|
||||
|
||||
def _fake_popen(cmd: List[str], **kwargs: Any) -> _FakeProc:
|
||||
spawned["cmd"] = cmd
|
||||
spawned["kwargs"] = kwargs
|
||||
return _FakeProc()
|
||||
|
||||
monkeypatch.setattr(photon_adapter.subprocess, "Popen", _fake_popen)
|
||||
|
||||
class _HealthyClient(_ProbeClient):
|
||||
async def post(self, *a: Any, **k: Any) -> Any:
|
||||
class _Resp:
|
||||
status_code = 200
|
||||
|
||||
return _Resp()
|
||||
|
||||
monkeypatch.setattr(photon_adapter.httpx, "AsyncClient", _HealthyClient)
|
||||
|
||||
await adapter._start_sidecar()
|
||||
|
||||
kwargs = spawned["kwargs"]
|
||||
assert kwargs["stdin"] is subprocess.PIPE
|
||||
assert kwargs["env"]["PHOTON_SIDECAR_WATCH_STDIN"] == "1"
|
||||
Loading…
Add table
Add a link
Reference in a new issue