hermes-agent/plugins/platforms/photon/sidecar/index.mjs
underthestars-zhy 4e4d27875f feat(photon): gRPC-native iMessage channel (no webhook)
Make Photon iMessage a first-class persistent-connection channel like
Discord/Slack, using the spectrum-ts gRPC stream for both directions.

- Inbound: the sidecar forwards the SDK's app.messages gRPC stream to the
  adapter over a loopback GET /inbound (NDJSON) instead of webhooks. Drops
  the aiohttp webhook server, HMAC signature verification, public URL, and
  PHOTON_WEBHOOK_* config; adapter reconnects with backoff.
- Management plane: device login uses client_id=photon-cli against the
  single dashboard host (Bearer), matching the official photon-hq/cli;
  find-or-create "Hermes Agent" project, enable Spectrum, rotate secret,
  register user (with phone dedup), surface the assigned iMessage line.
- SDK projectId is the project's spectrumProjectId, not the dashboard id;
  runtime creds persist to ~/.hermes/.env like every other channel.
- CLI: 6-step setup, webhook subcommands removed.
- Tests/docs updated for the gRPC flow; sidecar pins spectrum-ts ^1.17.1.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 21:03:58 -07:00

402 lines
13 KiB
JavaScript

// Hermes Agent — Photon Spectrum sidecar
//
// Spawned by `plugins/platforms/photon/adapter.py` to bridge BOTH directions
// of messaging to Photon's Spectrum platform via the `spectrum-ts` SDK (the
// SDK is TypeScript-only, so a Node sidecar is unavoidable — there is no
// Python SDK and no public HTTP message API).
//
// Inbound (gRPC -> Hermes): the SDK's `app.messages` async iterator is a
// long-lived gRPC stream. We serialize each `[space, message]` to a
// normalized JSON event and stream it to the Python adapter over a
// loopback `GET /inbound` (NDJSON). We pause pulling from the stream while
// no consumer is attached so a backlog isn't pulled-and-lost before the
// gateway connects.
// Outbound (Hermes -> gRPC): `/send` and `/typing` drive `space.send(...)` /
// `space.startTyping()` on the SDK.
//
// Protocol (all requests require `X-Hermes-Sidecar-Token: ${TOKEN}`):
// - GET /inbound -> 200 NDJSON stream; one JSON event per line, blank
// lines are heartbeats. One consumer at a time.
// - POST /healthz -> {"ok": true}
// - POST /send -> {"ok": true, "messageId": "..."}
// body: {"spaceId": "...", "text": "...", "replyTo": "..." | null}
// - POST /send-attachment -> {"ok": true, "messageId": "..."}
// body: {"spaceId": "...", "path": "...", "name": "..." | null,
// "mimeType": "..." | null, "caption": "..." | null,
// "kind": "attachment" | "voice", "replyTo": "..." | null}
// - POST /typing -> {"ok": true}
// body: {"spaceId": "..."}
// - POST /shutdown -> {"ok": true}; then process exits
//
// On SIGINT/SIGTERM the sidecar calls `app.stop()` (3s graceful) before
// exiting. Logs go to stderr; Python supervises restart.
//
// Env vars (required):
// PHOTON_PROJECT_ID (== the project's spectrumProjectId)
// PHOTON_PROJECT_SECRET
// PHOTON_SIDECAR_PORT
// PHOTON_SIDECAR_TOKEN
// Optional:
// PHOTON_SIDECAR_BIND (default 127.0.0.1)
import http from "node:http";
import { once } from "node:events";
const projectId = process.env.PHOTON_PROJECT_ID;
const projectSecret = process.env.PHOTON_PROJECT_SECRET;
const port = parseInt(process.env.PHOTON_SIDECAR_PORT || "8789", 10);
const bind = process.env.PHOTON_SIDECAR_BIND || "127.0.0.1";
const sharedToken = process.env.PHOTON_SIDECAR_TOKEN;
if (!projectId || !projectSecret || !sharedToken) {
console.error(
"photon-sidecar: PHOTON_PROJECT_ID, PHOTON_PROJECT_SECRET and " +
"PHOTON_SIDECAR_TOKEN must all be set."
);
process.exit(2);
}
// Lazy-load spectrum-ts so a missing install fails with a clear message
// instead of a cryptic module-resolution error during import.
let Spectrum, imessage, attachment, voice;
try {
({ Spectrum, attachment, voice } = await import("spectrum-ts"));
({ imessage } = await import("spectrum-ts/providers/imessage"));
} catch (e) {
console.error(
"photon-sidecar: spectrum-ts is not installed. Run `npm install` " +
"inside plugins/platforms/photon/sidecar/. Original error: " +
(e && e.stack ? e.stack : String(e))
);
process.exit(3);
}
const app = await Spectrum({
projectId,
projectSecret,
providers: [imessage.config()],
});
// ---------------------------------------------------------------------------
// Inbound: forward `app.messages` (gRPC stream) to the Python consumer.
// At most one Python consumer is attached at a time (the gateway adapter).
let consumerRes = null;
let consumerWaiters = [];
function waitForConsumer() {
if (consumerRes) return Promise.resolve();
return new Promise((resolve) => consumerWaiters.push(resolve));
}
function setConsumer(res) {
consumerRes = res;
const waiters = consumerWaiters;
consumerWaiters = [];
for (const resolve of waiters) resolve();
}
function clearConsumer(res) {
if (consumerRes === res) consumerRes = null;
}
// Write one NDJSON line to the active consumer. Blocks until a consumer is
// connected; if the write fails (consumer vanished mid-flight) we wait for a
// new consumer and retry, so a message is never silently dropped here.
async function deliver(line) {
for (;;) {
await waitForConsumer();
const res = consumerRes;
if (!res) continue;
try {
const flushed = res.write(line + "\n");
if (!flushed) await once(res, "drain");
return;
} catch {
clearConsumer(res);
}
}
}
function normalizeContent(content) {
if (!content || typeof content !== "object") {
return { type: "unknown" };
}
if (content.type === "text") {
return { type: "text", text: content.text || "" };
}
if (content.type === "attachment") {
// Bytes are reachable via content.read()/stream(); we surface metadata
// here and leave byte download to a follow-up (keeps the event small).
return {
type: "attachment",
id: content.id ?? null,
name: content.name ?? null,
mimeType: content.mimeType ?? null,
size: typeof content.size === "number" ? content.size : null,
};
}
return { type: content.type || "unknown" };
}
function normalizeEvent(space, message) {
try {
const msgSpace = message.space || {};
const ts = message.timestamp;
return {
messageId: message.id ?? null,
platform: message.platform || space.__platform || "iMessage",
space: {
id: space.id ?? msgSpace.id ?? null,
// iMessage spaces carry `type` ("dm"|"group") and `phone` directly.
type: space.type ?? msgSpace.type ?? "dm",
phone: space.phone ?? msgSpace.phone ?? null,
},
sender: { id: message.sender ? message.sender.id : null },
content: normalizeContent(message.content),
timestamp:
ts instanceof Date ? ts.toISOString() : ts ? String(ts) : null,
};
} catch (e) {
console.error(
"photon-sidecar: failed to normalize inbound message: " + String(e)
);
return null;
}
}
(async () => {
try {
for await (const [space, message] of app.messages) {
// Only forward inbound messages (ignore our own outbound echoes).
if (message && message.direction && message.direction !== "inbound") {
continue;
}
const event = normalizeEvent(space, message);
if (!event) continue;
await deliver(JSON.stringify(event));
}
} catch (e) {
console.error(
"photon-sidecar: inbound stream errored: " +
(e && e.stack ? e.stack : String(e))
);
}
})();
// ---------------------------------------------------------------------------
// HTTP control + inbound server (loopback only).
async function readBody(req) {
const chunks = [];
for await (const chunk of req) chunks.push(chunk);
const raw = Buffer.concat(chunks).toString("utf-8");
if (!raw) return {};
try {
return JSON.parse(raw);
} catch (e) {
throw new Error("invalid JSON body");
}
}
function unauthorized(res) {
res.statusCode = 401;
res.setHeader("Content-Type", "application/json");
res.end(JSON.stringify({ ok: false, error: "unauthorized" }));
}
function badRequest(res, msg) {
res.statusCode = 400;
res.setHeader("Content-Type", "application/json");
res.end(JSON.stringify({ ok: false, error: msg }));
}
function serverError(res) {
res.statusCode = 500;
res.setHeader("Content-Type", "application/json");
// Don't leak stack traces or raw exception text to the caller — even
// though we listen on loopback, the supervisor logs the real error
// and the client only needs a generic failure signal.
res.end(JSON.stringify({ ok: false, error: "internal sidecar error" }));
}
function ok(res, data) {
res.statusCode = 200;
res.setHeader("Content-Type", "application/json");
res.end(JSON.stringify({ ok: true, ...data }));
}
function handleInbound(req, res) {
res.statusCode = 200;
res.setHeader("Content-Type", "application/x-ndjson");
res.setHeader("Cache-Control", "no-store");
res.setHeader("Connection", "keep-alive");
// One consumer at a time — a fresh connection (e.g. after a reconnect)
// supersedes the previous one.
if (consumerRes && consumerRes !== res) {
try {
consumerRes.end();
} catch {
/* ignore */
}
}
setConsumer(res);
// Heartbeat keeps the socket warm through idle periods and lets the Python
// side detect a dead pipe promptly.
const heartbeat = setInterval(() => {
try {
res.write("\n");
} catch {
/* ignore */
}
}, 25000);
const cleanup = () => {
clearInterval(heartbeat);
clearConsumer(res);
};
req.on("close", cleanup);
req.on("aborted", cleanup);
res.on("error", cleanup);
}
async function resolveSpace(spaceId) {
// spectrum-ts exposes the same Space methods via `app.space(spaceId)` /
// narrowed helpers; we fall back through a few accessor shapes to
// tolerate small SDK API drift.
if (typeof app.space === "function") {
return await app.space(spaceId);
}
if (app.spaces && typeof app.spaces.get === "function") {
return await app.spaces.get(spaceId);
}
if (imessage) {
const im = imessage(app);
if (typeof im.space === "function") {
try {
return await im.space({ id: spaceId });
} catch {
/* fall through */
}
}
}
throw new Error(`unable to resolve space id ${spaceId}`);
}
const server = http.createServer(async (req, res) => {
if (req.headers["x-hermes-sidecar-token"] !== sharedToken) {
return unauthorized(res);
}
// Long-lived inbound NDJSON stream.
if (req.method === "GET" && req.url === "/inbound") {
return handleInbound(req, res);
}
if (req.method !== "POST") {
res.statusCode = 405;
return res.end();
}
try {
if (req.url === "/healthz") {
return ok(res, {});
}
if (req.url === "/shutdown") {
ok(res, {});
setTimeout(() => process.kill(process.pid, "SIGTERM"), 50);
return;
}
const body = await readBody(req);
if (req.url === "/send") {
const { spaceId, text, replyTo } = body || {};
if (!spaceId || typeof text !== "string") {
return badRequest(res, "spaceId and text are required");
}
const space = await resolveSpace(spaceId);
const result = replyTo
? await space.send(text, { replyTo })
: await space.send(text);
return ok(res, { messageId: result?.id || result?.messageId || null });
}
if (req.url === "/send-attachment") {
const { spaceId, path, name, mimeType, caption, kind, replyTo } =
body || {};
if (!spaceId || typeof path !== "string" || !path) {
return badRequest(res, "spaceId and path are required");
}
const space = await resolveSpace(spaceId);
// spectrum-ts infers name + MIME from the file extension; pass
// overrides only when Hermes supplied them so a known-good
// inference isn't clobbered with an empty string.
const opts = {};
if (name) opts.name = name;
if (mimeType) opts.mimeType = mimeType;
const builder =
kind === "voice"
? voice(path, Object.keys(opts).length ? opts : undefined)
: attachment(path, Object.keys(opts).length ? opts : undefined);
const sendOpts = replyTo ? { replyTo } : undefined;
const result = sendOpts
? await space.send(builder, sendOpts)
: await space.send(builder);
// iMessage delivers the caption as a separate bubble; send it
// after the media so the attachment renders first.
if (caption && typeof caption === "string") {
try {
await space.send(caption);
} catch (e) {
console.error(
"photon-sidecar: attachment sent but caption failed: " +
(e && e.stack ? e.stack : String(e))
);
}
}
return ok(res, { messageId: result?.id || result?.messageId || null });
}
if (req.url === "/typing") {
const { spaceId } = body || {};
if (!spaceId) return badRequest(res, "spaceId is required");
const space = await resolveSpace(spaceId);
if (typeof space.startTyping === "function") {
await space.startTyping();
} else if (typeof space.typing === "function") {
await space.typing();
} else if (typeof space.setTyping === "function") {
await space.setTyping(true);
}
return ok(res, {});
}
res.statusCode = 404;
res.setHeader("Content-Type", "application/json");
return res.end(JSON.stringify({ ok: false, error: "not found" }));
} catch (e) {
console.error(
"photon-sidecar: handler error: " +
(e && e.stack ? e.stack : String(e))
);
// serverError() intentionally returns a generic message — see its
// body for the rationale.
return serverError(res);
}
});
server.listen(port, bind, () => {
console.error(`photon-sidecar: listening on ${bind}:${port}`);
});
async function shutdown(signal) {
console.error(`photon-sidecar: received ${signal}, stopping...`);
try {
await Promise.race([
app.stop(),
new Promise((resolve) => setTimeout(resolve, 3000)),
]);
} catch (e) {
console.error("photon-sidecar: app.stop() failed: " + String(e));
}
server.close(() => process.exit(0));
setTimeout(() => process.exit(1), 500).unref();
}
process.on("SIGINT", () => shutdown("SIGINT"));
process.on("SIGTERM", () => shutdown("SIGTERM"));