fix(photon): recover degraded upstream stream

This commit is contained in:
helix4u 2026-06-22 17:52:46 -06:00 committed by Teknium
parent 34bd6a0db5
commit 06cbc3bae9
6 changed files with 241 additions and 4 deletions

View file

@ -3494,7 +3494,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
self._failed_platforms[adapter.platform] = {
"config": platform_config,
"attempts": 0,
"next_retry": time.monotonic() + 30,
"next_retry": time.monotonic(),
}
logger.info(
"%s queued for background reconnection",
@ -6367,6 +6367,8 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
for _ in range(30):
if not self._running:
return
if self._failed_platforms:
break
await asyncio.sleep(1)
continue

View file

@ -91,7 +91,11 @@ _SIDECAR_DIR = Path(__file__).parent / "sidecar"
_PHOTON_RETRYABLE_PATTERNS = (
"internal sidecar error",
"upstream connect error",
"upstream unavailable",
"connection dropped",
"reset reason: overflow",
"upstream_overflow",
"upstream_unavailable",
)
# Minimum seconds between typing-indicator calls for the same chat.
@ -235,8 +239,10 @@ class PhotonAdapter(BasePlatformAdapter):
self._sidecar_proc: Optional[subprocess.Popen] = None
self._sidecar_supervisor_task: Optional[asyncio.Task] = None
self._inbound_task: Optional[asyncio.Task] = None
self._sidecar_health_task: Optional[asyncio.Task] = None
self._inbound_running = False
self._http_client: Optional["httpx.AsyncClient"] = None
self._sidecar_health_interval = 15.0
# Lightweight in-memory dedup. The gRPC stream is at-least-once, so we
# may see the same messageId more than once (e.g. after a reconnect).
self._seen_messages: Dict[str, float] = {}
@ -370,6 +376,9 @@ class PhotonAdapter(BasePlatformAdapter):
self._inbound_task = asyncio.get_event_loop().create_task(
self._inbound_loop()
)
self._sidecar_health_task = asyncio.get_event_loop().create_task(
self._monitor_sidecar_health()
)
self._mark_connected()
logger.info(
@ -380,6 +389,17 @@ class PhotonAdapter(BasePlatformAdapter):
async def disconnect(self) -> None:
self._inbound_running = False
if self._sidecar_health_task is not None:
task = self._sidecar_health_task
self._sidecar_health_task = None
task.cancel()
if task is not asyncio.current_task():
try:
await task
except asyncio.CancelledError:
pass
except Exception:
pass
if self._inbound_task is not None:
self._inbound_task.cancel()
try:
@ -440,6 +460,49 @@ class PhotonAdapter(BasePlatformAdapter):
await asyncio.sleep(backoff)
backoff = min(backoff * 2, 30.0)
async def _monitor_sidecar_health(self) -> None:
"""Promote degraded upstream Photon stream health into reconnect.
The sidecar HTTP process can stay alive while spectrum-ts repeatedly
fails to maintain the upstream inbound gRPC stream. Polling `/healthz`
keeps that from becoming a silent inbound outage.
"""
while self._inbound_running:
await asyncio.sleep(self._sidecar_health_interval)
if not self._inbound_running:
break
try:
data = await self._sidecar_call("/healthz", {})
except asyncio.CancelledError:
raise
except Exception as exc:
logger.debug("[photon] sidecar health check failed: %s", exc)
continue
stream = data.get("stream") if isinstance(data, dict) else None
if not isinstance(stream, dict) or stream.get("ok") is not False:
continue
state = str(stream.get("state") or "unknown")
degraded_for_ms = stream.get("degradedForMs")
last_issue = str(stream.get("lastIssue") or "unknown stream issue")
message = (
"Photon upstream stream degraded"
f" (state={state}, degradedForMs={degraded_for_ms}): "
f"{last_issue}"
)
logger.error("[photon] %s", message)
self._set_fatal_error(
"UPSTREAM_STREAM_DEGRADED",
message,
retryable=True,
)
try:
await self._notify_fatal_error()
except Exception as exc: # pragma: no cover - defensive
logger.warning("[photon] fatal-error notification failed: %s", exc)
break
async def _on_inbound_line(self, line: str) -> None:
try:
event = json.loads(line)

View file

@ -80,6 +80,109 @@ const E164_RE = /^\+\d{6,}$/;
const MAX_KNOWN_SPACES = 2048;
const MAX_KNOWN_MESSAGES = 1024;
const MAX_REACTION_HANDLES = 512;
const STREAM_DEGRADED_RESTART_MS =
Number(process.env.PHOTON_STREAM_DEGRADED_RESTART_MS) || 90 * 1000;
const STREAM_INTERRUPTED_DEGRADE_COUNT =
Number(process.env.PHOTON_STREAM_INTERRUPTED_DEGRADE_COUNT) || 3;
const streamHealth = {
state: "starting",
degradedSince: null,
lastHealthyAt: null,
lastIssueAt: null,
lastIssue: null,
issueCount: 0,
};
let streamRestartTimer = null;
function streamHealthSnapshot() {
const now = Date.now();
const degradedForMs =
streamHealth.degradedSince === null ? 0 : now - streamHealth.degradedSince;
return {
ok: streamHealth.state !== "degraded",
state: streamHealth.state,
degradedForMs,
restartAfterMs: STREAM_DEGRADED_RESTART_MS,
lastHealthyAt: streamHealth.lastHealthyAt,
lastIssueAt: streamHealth.lastIssueAt,
lastIssue: streamHealth.lastIssue,
issueCount: streamHealth.issueCount,
};
}
function markStreamHealthy() {
streamHealth.state = "healthy";
streamHealth.degradedSince = null;
streamHealth.lastHealthyAt = new Date().toISOString();
streamHealth.issueCount = 0;
if (streamRestartTimer) {
clearTimeout(streamRestartTimer);
streamRestartTimer = null;
}
}
function scheduleStreamRestart() {
if (STREAM_DEGRADED_RESTART_MS <= 0 || streamRestartTimer) return;
streamRestartTimer = setTimeout(() => {
streamRestartTimer = null;
if (streamHealth.state !== "degraded" || streamHealth.degradedSince === null) {
return;
}
const degradedForMs = Date.now() - streamHealth.degradedSince;
if (degradedForMs < STREAM_DEGRADED_RESTART_MS) {
scheduleStreamRestart();
return;
}
console.error(
`photon-sidecar: upstream stream degraded for ${degradedForMs}ms; ` +
"exiting so Hermes can restart the Photon adapter"
);
process.exit(75);
}, STREAM_DEGRADED_RESTART_MS + 1000);
streamRestartTimer.unref();
}
function markStreamDegraded(reason) {
const now = Date.now();
if (streamHealth.state !== "degraded") {
streamHealth.degradedSince = now;
}
streamHealth.state = "degraded";
streamHealth.lastIssueAt = new Date(now).toISOString();
streamHealth.lastIssue = reason;
streamHealth.issueCount += 1;
scheduleStreamRestart();
}
function markStreamRecovering(reason) {
if (streamHealth.state !== "recovering") {
streamHealth.issueCount = 0;
}
streamHealth.state = "recovering";
streamHealth.lastIssueAt = new Date().toISOString();
streamHealth.lastIssue = reason;
streamHealth.issueCount += 1;
if (streamHealth.issueCount >= STREAM_INTERRUPTED_DEGRADE_COUNT) {
markStreamDegraded(reason);
}
}
const originalConsoleError = console.error.bind(console);
console.error = (...args) => {
const text = args
.map((arg) => (arg && arg.stack ? arg.stack : String(arg)))
.join(" ");
if (text.includes("[spectrum.stream]")) {
const reason = text.split("\n", 1)[0];
if (text.includes("persistently failing")) {
markStreamDegraded(reason);
} else if (text.includes("stream interrupted")) {
markStreamRecovering(reason);
}
}
originalConsoleError(...args);
};
if (!projectId || !projectSecret || !sharedToken) {
console.error(
@ -353,6 +456,7 @@ async function normalizeEvent(space, message) {
try {
for await (const [space, message] of app.messages) {
backoff = 1000; // healthy traffic — reset
markStreamHealthy();
// Only forward inbound messages (ignore our own outbound echoes).
if (message && message.direction && message.direction !== "inbound") {
continue;
@ -364,11 +468,14 @@ async function normalizeEvent(space, message) {
await deliver(JSON.stringify(event));
}
console.error("photon-sidecar: inbound stream ended — re-subscribing");
markStreamRecovering("inbound stream ended");
} catch (e) {
const reason = e && e.message ? e.message : String(e);
console.error(
"photon-sidecar: inbound stream errored — restarting: " +
(e && e.message ? e.message : String(e))
reason
);
markStreamRecovering(reason);
}
await new Promise((r) =>
setTimeout(r, backoff + Math.random() * backoff * 0.2)
@ -530,7 +637,7 @@ const server = http.createServer(async (req, res) => {
}
try {
if (req.url === "/healthz") {
return ok(res, {});
return ok(res, { stream: streamHealthSnapshot() });
}
if (req.url === "/shutdown") {
ok(res, {});

View file

@ -544,6 +544,24 @@ class TestRuntimeDisconnectQueuing:
assert Platform.TELEGRAM in runner._failed_platforms
assert runner._failed_platforms[Platform.TELEGRAM]["attempts"] == 0
@pytest.mark.asyncio
async def test_retryable_runtime_error_reconnects_immediately(self):
"""Runtime failures should not wait for the startup retry delay."""
runner = _make_runner()
runner.stop = AsyncMock()
adapter = StubAdapter(succeed=True)
adapter._set_fatal_error("sidecar_crashed", "bridge exited", retryable=True)
runner.adapters[Platform.TELEGRAM] = adapter
before = time.monotonic()
await runner._handle_adapter_fatal_error(adapter)
after = time.monotonic()
info = runner._failed_platforms[Platform.TELEGRAM]
assert info["attempts"] == 0
assert before <= info["next_retry"] <= after
@pytest.mark.asyncio
async def test_nonretryable_runtime_error_not_queued(self):
"""Non-retryable runtime errors should not be queued for reconnection."""
@ -765,4 +783,3 @@ class TestPlatformSlashCommand:
runner = _make_runner()
out = await runner._handle_platform_command(self._make_event("/platform"))
assert "Gateway platforms" in out

View file

@ -12,6 +12,8 @@ dying (issue #50185):
``retryable=True`` fatal so the gateway reconnect watcher revives the
platform instead of returning silently and leaving ``_inbound_loop``
spinning against a dead port.
4. ``_monitor_sidecar_health`` promotes degraded upstream stream health
reported by ``/healthz`` into the same retryable reconnect path.
No Node sidecar is spawned and no ports are bound.
"""
@ -195,3 +197,40 @@ async def test_clean_shutdown_does_not_raise_fatal(
assert adapter.has_fatal_error is False
assert notified == []
@pytest.mark.asyncio
async def test_degraded_stream_health_raises_retryable_fatal(
monkeypatch: pytest.MonkeyPatch,
) -> None:
adapter = _make_adapter(monkeypatch)
adapter._inbound_running = True
adapter._sidecar_health_interval = 0.0
async def _fake_call(path: str, payload: Dict[str, Any]) -> Any:
assert path == "/healthz"
return {
"ok": True,
"stream": {
"ok": False,
"state": "degraded",
"degradedForMs": 120000,
"lastIssue": "[spectrum.stream] stream interrupted; reconnecting",
},
}
notified: list[bool] = []
async def _fake_notify() -> None:
notified.append(True)
adapter._inbound_running = False
monkeypatch.setattr(adapter, "_sidecar_call", _fake_call)
monkeypatch.setattr(adapter, "_notify_fatal_error", _fake_notify)
await adapter._monitor_sidecar_health()
assert adapter.has_fatal_error is True
assert adapter.fatal_error_code == "UPSTREAM_STREAM_DEGRADED"
assert adapter.fatal_error_retryable is True
assert notified == [True]

View file

@ -17,6 +17,15 @@ def test_sidecar_applies_spectrum_patch_before_importing_sdk() -> None:
assert index.index("patchSpectrumTs();") < index.index('await import("spectrum-ts")')
def test_sidecar_healthz_reports_stream_health() -> None:
"""Local process health must include upstream stream health."""
index = Path("plugins/platforms/photon/sidecar/index.mjs").read_text(encoding="utf-8")
assert "function streamHealthSnapshot()" in index
assert 'return ok(res, { stream: streamHealthSnapshot() });' in index
assert "STREAM_INTERRUPTED_DEGRADE_COUNT" in index
assert "process.exit(75);" in index
def test_spectrum_patch_preserves_text_when_single_attachment(tmp_path: Path) -> None:
"""The sidecar dependency patch must turn text+one attachment into group content."""
dist = tmp_path / "node_modules" / "spectrum-ts" / "dist"