From 06cbc3bae98558cfbb40313cb5a59ec1ad7e1ae5 Mon Sep 17 00:00:00 2001 From: helix4u <4317663+helix4u@users.noreply.github.com> Date: Mon, 22 Jun 2026 17:52:46 -0600 Subject: [PATCH] fix(photon): recover degraded upstream stream --- gateway/run.py | 4 +- plugins/platforms/photon/adapter.py | 63 ++++++++++ plugins/platforms/photon/sidecar/index.mjs | 111 +++++++++++++++++- tests/gateway/test_platform_reconnect.py | 19 ++- .../photon/test_overflow_recovery.py | 39 ++++++ .../platforms/photon/test_spectrum_patch.py | 9 ++ 6 files changed, 241 insertions(+), 4 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index bc7f42aa8e9..dbce984deb4 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -3494,7 +3494,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew self._failed_platforms[adapter.platform] = { "config": platform_config, "attempts": 0, - "next_retry": time.monotonic() + 30, + "next_retry": time.monotonic(), } logger.info( "%s queued for background reconnection", @@ -6367,6 +6367,8 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew for _ in range(30): if not self._running: return + if self._failed_platforms: + break await asyncio.sleep(1) continue diff --git a/plugins/platforms/photon/adapter.py b/plugins/platforms/photon/adapter.py index d025b8e3d29..5a3a160c0fc 100644 --- a/plugins/platforms/photon/adapter.py +++ b/plugins/platforms/photon/adapter.py @@ -91,7 +91,11 @@ _SIDECAR_DIR = Path(__file__).parent / "sidecar" _PHOTON_RETRYABLE_PATTERNS = ( "internal sidecar error", "upstream connect error", + "upstream unavailable", + "connection dropped", "reset reason: overflow", + "upstream_overflow", + "upstream_unavailable", ) # Minimum seconds between typing-indicator calls for the same chat. @@ -235,8 +239,10 @@ class PhotonAdapter(BasePlatformAdapter): self._sidecar_proc: Optional[subprocess.Popen] = None self._sidecar_supervisor_task: Optional[asyncio.Task] = None self._inbound_task: Optional[asyncio.Task] = None + self._sidecar_health_task: Optional[asyncio.Task] = None self._inbound_running = False self._http_client: Optional["httpx.AsyncClient"] = None + self._sidecar_health_interval = 15.0 # Lightweight in-memory dedup. The gRPC stream is at-least-once, so we # may see the same messageId more than once (e.g. after a reconnect). self._seen_messages: Dict[str, float] = {} @@ -370,6 +376,9 @@ class PhotonAdapter(BasePlatformAdapter): self._inbound_task = asyncio.get_event_loop().create_task( self._inbound_loop() ) + self._sidecar_health_task = asyncio.get_event_loop().create_task( + self._monitor_sidecar_health() + ) self._mark_connected() logger.info( @@ -380,6 +389,17 @@ class PhotonAdapter(BasePlatformAdapter): async def disconnect(self) -> None: self._inbound_running = False + if self._sidecar_health_task is not None: + task = self._sidecar_health_task + self._sidecar_health_task = None + task.cancel() + if task is not asyncio.current_task(): + try: + await task + except asyncio.CancelledError: + pass + except Exception: + pass if self._inbound_task is not None: self._inbound_task.cancel() try: @@ -440,6 +460,49 @@ class PhotonAdapter(BasePlatformAdapter): await asyncio.sleep(backoff) backoff = min(backoff * 2, 30.0) + async def _monitor_sidecar_health(self) -> None: + """Promote degraded upstream Photon stream health into reconnect. + + The sidecar HTTP process can stay alive while spectrum-ts repeatedly + fails to maintain the upstream inbound gRPC stream. Polling `/healthz` + keeps that from becoming a silent inbound outage. + """ + while self._inbound_running: + await asyncio.sleep(self._sidecar_health_interval) + if not self._inbound_running: + break + try: + data = await self._sidecar_call("/healthz", {}) + except asyncio.CancelledError: + raise + except Exception as exc: + logger.debug("[photon] sidecar health check failed: %s", exc) + continue + + stream = data.get("stream") if isinstance(data, dict) else None + if not isinstance(stream, dict) or stream.get("ok") is not False: + continue + + state = str(stream.get("state") or "unknown") + degraded_for_ms = stream.get("degradedForMs") + last_issue = str(stream.get("lastIssue") or "unknown stream issue") + message = ( + "Photon upstream stream degraded" + f" (state={state}, degradedForMs={degraded_for_ms}): " + f"{last_issue}" + ) + logger.error("[photon] %s", message) + self._set_fatal_error( + "UPSTREAM_STREAM_DEGRADED", + message, + retryable=True, + ) + try: + await self._notify_fatal_error() + except Exception as exc: # pragma: no cover - defensive + logger.warning("[photon] fatal-error notification failed: %s", exc) + break + async def _on_inbound_line(self, line: str) -> None: try: event = json.loads(line) diff --git a/plugins/platforms/photon/sidecar/index.mjs b/plugins/platforms/photon/sidecar/index.mjs index 85c3aa28736..148e9d377eb 100644 --- a/plugins/platforms/photon/sidecar/index.mjs +++ b/plugins/platforms/photon/sidecar/index.mjs @@ -80,6 +80,109 @@ const E164_RE = /^\+\d{6,}$/; const MAX_KNOWN_SPACES = 2048; const MAX_KNOWN_MESSAGES = 1024; const MAX_REACTION_HANDLES = 512; +const STREAM_DEGRADED_RESTART_MS = + Number(process.env.PHOTON_STREAM_DEGRADED_RESTART_MS) || 90 * 1000; +const STREAM_INTERRUPTED_DEGRADE_COUNT = + Number(process.env.PHOTON_STREAM_INTERRUPTED_DEGRADE_COUNT) || 3; + +const streamHealth = { + state: "starting", + degradedSince: null, + lastHealthyAt: null, + lastIssueAt: null, + lastIssue: null, + issueCount: 0, +}; +let streamRestartTimer = null; + +function streamHealthSnapshot() { + const now = Date.now(); + const degradedForMs = + streamHealth.degradedSince === null ? 0 : now - streamHealth.degradedSince; + return { + ok: streamHealth.state !== "degraded", + state: streamHealth.state, + degradedForMs, + restartAfterMs: STREAM_DEGRADED_RESTART_MS, + lastHealthyAt: streamHealth.lastHealthyAt, + lastIssueAt: streamHealth.lastIssueAt, + lastIssue: streamHealth.lastIssue, + issueCount: streamHealth.issueCount, + }; +} + +function markStreamHealthy() { + streamHealth.state = "healthy"; + streamHealth.degradedSince = null; + streamHealth.lastHealthyAt = new Date().toISOString(); + streamHealth.issueCount = 0; + if (streamRestartTimer) { + clearTimeout(streamRestartTimer); + streamRestartTimer = null; + } +} + +function scheduleStreamRestart() { + if (STREAM_DEGRADED_RESTART_MS <= 0 || streamRestartTimer) return; + streamRestartTimer = setTimeout(() => { + streamRestartTimer = null; + if (streamHealth.state !== "degraded" || streamHealth.degradedSince === null) { + return; + } + const degradedForMs = Date.now() - streamHealth.degradedSince; + if (degradedForMs < STREAM_DEGRADED_RESTART_MS) { + scheduleStreamRestart(); + return; + } + console.error( + `photon-sidecar: upstream stream degraded for ${degradedForMs}ms; ` + + "exiting so Hermes can restart the Photon adapter" + ); + process.exit(75); + }, STREAM_DEGRADED_RESTART_MS + 1000); + streamRestartTimer.unref(); +} + +function markStreamDegraded(reason) { + const now = Date.now(); + if (streamHealth.state !== "degraded") { + streamHealth.degradedSince = now; + } + streamHealth.state = "degraded"; + streamHealth.lastIssueAt = new Date(now).toISOString(); + streamHealth.lastIssue = reason; + streamHealth.issueCount += 1; + scheduleStreamRestart(); +} + +function markStreamRecovering(reason) { + if (streamHealth.state !== "recovering") { + streamHealth.issueCount = 0; + } + streamHealth.state = "recovering"; + streamHealth.lastIssueAt = new Date().toISOString(); + streamHealth.lastIssue = reason; + streamHealth.issueCount += 1; + if (streamHealth.issueCount >= STREAM_INTERRUPTED_DEGRADE_COUNT) { + markStreamDegraded(reason); + } +} + +const originalConsoleError = console.error.bind(console); +console.error = (...args) => { + const text = args + .map((arg) => (arg && arg.stack ? arg.stack : String(arg))) + .join(" "); + if (text.includes("[spectrum.stream]")) { + const reason = text.split("\n", 1)[0]; + if (text.includes("persistently failing")) { + markStreamDegraded(reason); + } else if (text.includes("stream interrupted")) { + markStreamRecovering(reason); + } + } + originalConsoleError(...args); +}; if (!projectId || !projectSecret || !sharedToken) { console.error( @@ -353,6 +456,7 @@ async function normalizeEvent(space, message) { try { for await (const [space, message] of app.messages) { backoff = 1000; // healthy traffic — reset + markStreamHealthy(); // Only forward inbound messages (ignore our own outbound echoes). if (message && message.direction && message.direction !== "inbound") { continue; @@ -364,11 +468,14 @@ async function normalizeEvent(space, message) { await deliver(JSON.stringify(event)); } console.error("photon-sidecar: inbound stream ended — re-subscribing"); + markStreamRecovering("inbound stream ended"); } catch (e) { + const reason = e && e.message ? e.message : String(e); console.error( "photon-sidecar: inbound stream errored — restarting: " + - (e && e.message ? e.message : String(e)) + reason ); + markStreamRecovering(reason); } await new Promise((r) => setTimeout(r, backoff + Math.random() * backoff * 0.2) @@ -530,7 +637,7 @@ const server = http.createServer(async (req, res) => { } try { if (req.url === "/healthz") { - return ok(res, {}); + return ok(res, { stream: streamHealthSnapshot() }); } if (req.url === "/shutdown") { ok(res, {}); diff --git a/tests/gateway/test_platform_reconnect.py b/tests/gateway/test_platform_reconnect.py index 1f4cf11f16a..80835f2146b 100644 --- a/tests/gateway/test_platform_reconnect.py +++ b/tests/gateway/test_platform_reconnect.py @@ -544,6 +544,24 @@ class TestRuntimeDisconnectQueuing: assert Platform.TELEGRAM in runner._failed_platforms assert runner._failed_platforms[Platform.TELEGRAM]["attempts"] == 0 + @pytest.mark.asyncio + async def test_retryable_runtime_error_reconnects_immediately(self): + """Runtime failures should not wait for the startup retry delay.""" + runner = _make_runner() + runner.stop = AsyncMock() + + adapter = StubAdapter(succeed=True) + adapter._set_fatal_error("sidecar_crashed", "bridge exited", retryable=True) + runner.adapters[Platform.TELEGRAM] = adapter + + before = time.monotonic() + await runner._handle_adapter_fatal_error(adapter) + after = time.monotonic() + + info = runner._failed_platforms[Platform.TELEGRAM] + assert info["attempts"] == 0 + assert before <= info["next_retry"] <= after + @pytest.mark.asyncio async def test_nonretryable_runtime_error_not_queued(self): """Non-retryable runtime errors should not be queued for reconnection.""" @@ -765,4 +783,3 @@ class TestPlatformSlashCommand: runner = _make_runner() out = await runner._handle_platform_command(self._make_event("/platform")) assert "Gateway platforms" in out - diff --git a/tests/plugins/platforms/photon/test_overflow_recovery.py b/tests/plugins/platforms/photon/test_overflow_recovery.py index 4724f546993..75d9af59d83 100644 --- a/tests/plugins/platforms/photon/test_overflow_recovery.py +++ b/tests/plugins/platforms/photon/test_overflow_recovery.py @@ -12,6 +12,8 @@ dying (issue #50185): ``retryable=True`` fatal so the gateway reconnect watcher revives the platform — instead of returning silently and leaving ``_inbound_loop`` spinning against a dead port. + 4. ``_monitor_sidecar_health`` promotes degraded upstream stream health + reported by ``/healthz`` into the same retryable reconnect path. No Node sidecar is spawned and no ports are bound. """ @@ -195,3 +197,40 @@ async def test_clean_shutdown_does_not_raise_fatal( assert adapter.has_fatal_error is False assert notified == [] + + +@pytest.mark.asyncio +async def test_degraded_stream_health_raises_retryable_fatal( + monkeypatch: pytest.MonkeyPatch, +) -> None: + adapter = _make_adapter(monkeypatch) + adapter._inbound_running = True + adapter._sidecar_health_interval = 0.0 + + async def _fake_call(path: str, payload: Dict[str, Any]) -> Any: + assert path == "/healthz" + return { + "ok": True, + "stream": { + "ok": False, + "state": "degraded", + "degradedForMs": 120000, + "lastIssue": "[spectrum.stream] stream interrupted; reconnecting", + }, + } + + notified: list[bool] = [] + + async def _fake_notify() -> None: + notified.append(True) + adapter._inbound_running = False + + monkeypatch.setattr(adapter, "_sidecar_call", _fake_call) + monkeypatch.setattr(adapter, "_notify_fatal_error", _fake_notify) + + await adapter._monitor_sidecar_health() + + assert adapter.has_fatal_error is True + assert adapter.fatal_error_code == "UPSTREAM_STREAM_DEGRADED" + assert adapter.fatal_error_retryable is True + assert notified == [True] diff --git a/tests/plugins/platforms/photon/test_spectrum_patch.py b/tests/plugins/platforms/photon/test_spectrum_patch.py index 2f1943fa119..a16774e1ffb 100644 --- a/tests/plugins/platforms/photon/test_spectrum_patch.py +++ b/tests/plugins/platforms/photon/test_spectrum_patch.py @@ -17,6 +17,15 @@ def test_sidecar_applies_spectrum_patch_before_importing_sdk() -> None: assert index.index("patchSpectrumTs();") < index.index('await import("spectrum-ts")') +def test_sidecar_healthz_reports_stream_health() -> None: + """Local process health must include upstream stream health.""" + index = Path("plugins/platforms/photon/sidecar/index.mjs").read_text(encoding="utf-8") + assert "function streamHealthSnapshot()" in index + assert 'return ok(res, { stream: streamHealthSnapshot() });' in index + assert "STREAM_INTERRUPTED_DEGRADE_COUNT" in index + assert "process.exit(75);" in index + + def test_spectrum_patch_preserves_text_when_single_attachment(tmp_path: Path) -> None: """The sidecar dependency patch must turn text+one attachment into group content.""" dist = tmp_path / "node_modules" / "spectrum-ts" / "dist"