feat(gateway): add Photon Spectrum (iMessage) platform plugin

First-class iMessage support via Photon's managed Spectrum platform.
Targeted as a successor to the BlueBubbles adapter — Photon allocates
the iMessage line, handles delivery, and abuse-prevention so users
don't have to run their own Mac relay. Free tier uses Photon's shared
line pool.

Architecture:
- Inbound: signed JSON webhooks (X-Spectrum-Signature, HMAC-SHA256)
  delivered to a local aiohttp listener. Dedupes on message.id,
  rejects deliveries with >5min timestamp drift.
- Outbound: small supervised Node sidecar that runs the spectrum-ts
  SDK. Photon does not currently expose a public HTTP send-message
  endpoint; the sidecar is the only way to call Space.send() today.
  When Photon ships an HTTP send endpoint we collapse the sidecar
  into _sidecar_send and drop the Node dep — every other layer of
  the plugin stays the same.
- Setup: 'hermes photon login' runs the RFC 8628 device-code flow;
  'hermes photon setup' creates a Spectrum-enabled project, creates
  a shared user (free tier), installs the sidecar's npm deps.
- Webhook management: 'hermes photon webhook register|list|delete'.
- Credentials persisted under credential_pool.photon /
  credential_pool.photon_project in ~/.hermes/auth.json.

Plugin path (not built-in) — per current policy (May 2026), all new
platforms ship under plugins/platforms/. Registers itself via
ctx.register_platform() + ctx.register_cli_command(), zero edits to
core gateway code.

Tests cover:
- HMAC-SHA256 signature verification (happy path, tampered body,
  wrong secret, drift, missing v0 prefix, empty inputs, non-integer
  timestamp)
- Inbound dispatch for text DMs, group ids (any;+;...), and
  attachment metadata markers
- Deduplication window
- check_requirements gating when Node is absent
- Device-code flow: request, header-based token return,
  body-fallback token return, access_denied propagation
- Project/user/webhook API clients with mocked httpx

Known limitations (current Photon API):
- Attachments are metadata only — no download URL yet
- Outbound attachment send not wired (sidecar can add easily)
- Reactions / message effects not exposed yet

Docs: website/docs/user-guide/messaging/photon.md + sidebar entry.
This commit is contained in:
Teknium 2026-05-25 18:55:03 -07:00
parent 6e7033bb4c
commit 5b4e431e8c
15 changed files with 2587 additions and 0 deletions

View file

@ -0,0 +1,117 @@
# Photon iMessage platform plugin
This plugin connects Hermes Agent to iMessage (and WhatsApp Business +
future Spectrum interfaces) through [Photon][photon] — a managed
service that handles the iMessage line allocation, delivery, and
abuse-prevention layer so users don't have to run their own Mac
relay.
The free tier uses Photon's shared iMessage line pool (`type: shared`)
and is the path we recommend for everyone who doesn't already pay for a
dedicated number.
## Architecture
```
┌─────────────────────────┐ HMAC-signed POSTs ┌──────────────────┐
│ Photon Spectrum cloud │ ──────────────────────► │ Hermes Agent │
│ (iMessage line owner) │ │ (Python) │
└─────────────────────────┘ JSON over loopback │ │
▲ ◄────────────────────── │ PhotonAdapter │
│ │ + aiohttp recv │
│ spectrum-ts │ │
│ SDK (Node) │ spawns + super- │
▼ │ vises ▼ │
┌─────────────────────────┐ ├──────────────────┤
│ Node sidecar │ ◄──── X-Hermes- ─ │ Node sidecar │
│ (plugins/.../sidecar) │ Sidecar-Token │ child process │
└─────────────────────────┘ └──────────────────┘
```
Inbound traffic is webhook-only — Hermes runs an aiohttp listener
that verifies `X-Spectrum-Signature` and dedupes on `message.id`.
Outbound traffic goes through a tiny Node sidecar that runs the
`spectrum-ts` SDK. Photon does not currently expose an HTTP
send-message endpoint; their own docs say:
> Pass `space.id` to `Space.send(...)` from a separate `spectrum-ts`
> SDK instance to reply. **No public HTTP send endpoint exists today.**
> — https://photon.codes/docs/webhooks/events
When Photon ships an HTTP send endpoint, `_sidecar_send` is the one
function that swaps and the sidecar disappears. The rest of the
plugin stays the same.
## First-time setup
```bash
# 1. Log in via the device-code flow (opens browser)
hermes photon login
# 2. Full setup: project, user, sidecar deps
hermes photon setup --phone +15551234567
# 3. Expose your webhook URL to the public internet
# (cloudflared, ngrok, your gateway's public hostname, etc.)
# Then register it with Photon:
hermes photon webhook register https://your-host.example.com/photon/webhook
# 4. Save the signing secret it prints to ~/.hermes/.env
# as PHOTON_WEBHOOK_SECRET=...
# Photon only returns it ONCE.
# 5. Start the gateway
hermes gateway start --platform photon
```
## Credentials
Stored in `~/.hermes/auth.json` under `credential_pool`:
```jsonc
{
"credential_pool": {
"photon": [
{ "access_token": "<dashboard-bearer>", "issued_at": ... }
],
"photon_project": [
{ "project_id": "...", "project_secret": "...", "name": "Hermes Agent" }
]
}
}
```
The per-URL webhook signing secret is treated like an API key and
lives in `~/.hermes/.env` as `PHOTON_WEBHOOK_SECRET`.
## Configuration knobs
All env vars are documented in `plugin.yaml`. The most important are:
| Env var | Default | Meaning |
|--------------------------|--------------------|-----------------------------------------|
| `PHOTON_PROJECT_ID` | from auth.json | Spectrum project ID |
| `PHOTON_PROJECT_SECRET` | from auth.json | Spectrum project secret (HTTP Basic) |
| `PHOTON_WEBHOOK_SECRET` | (unset) | Signing secret returned at register |
| `PHOTON_WEBHOOK_PORT` | 8788 | Local port for the aiohttp listener |
| `PHOTON_WEBHOOK_PATH` | /photon/webhook | Path under which the listener mounts |
| `PHOTON_SIDECAR_PORT` | 8789 | Loopback port for sidecar control |
| `PHOTON_HOME_CHANNEL` | (unset) | Default space ID for cron delivery |
| `PHOTON_ALLOWED_USERS` | (unset) | Comma-separated E.164 allowlist |
## Limitations (current Photon API)
- **Attachments are metadata only.** Inbound webhooks include the
filename + MIME type but no download URL. The plugin surfaces a
text marker (`[Photon attachment received: …]`) so the agent knows
something arrived, but cannot read the bytes. Photon's docs note
an attachment retrieval endpoint is on the roadmap.
- **Outbound attachments are not supported yet.** Adding them is
straightforward once the sidecar wires up `attachment(...)` /
`space.send(attachment(...))` from `spectrum-ts`.
- **Reactions, message effects, polls** — not exposed yet; the
`spectrum-ts` SDK supports them, and the sidecar is the natural
place to add them when the agent has reason to use them.
[photon]: https://photon.codes/

View file

@ -0,0 +1,4 @@
"""Photon Spectrum (iMessage) platform plugin entry point."""
from .adapter import register
__all__ = ["register"]

View file

@ -0,0 +1,737 @@
"""
Photon Spectrum (iMessage) platform adapter for Hermes Agent.
Inbound:
Photon delivers signed JSON ``POST``s to a URL we register. The
adapter spins up an aiohttp server on ``PHOTON_WEBHOOK_PORT``,
verifies ``X-Spectrum-Signature`` (HMAC-SHA256 of
``v0:{timestamp}:{body}`` keyed by the per-URL signing secret),
rejects deliveries with a timestamp drift > 5 minutes, dedupes on
``message.id``, and dispatches a normalized ``MessageEvent`` to the
gateway runner via ``BasePlatformAdapter.handle_message``.
Outbound:
Photon does not currently expose a public HTTP send-message
endpoint, so the adapter spawns a small Node sidecar (see
``sidecar/index.mjs``) that runs the ``spectrum-ts`` SDK. Each
``send`` / ``send_typing`` call from Hermes is a loopback POST to
the sidecar with a shared bearer token.
When Photon ships an HTTP send endpoint we can collapse the sidecar
into ``_send_via_http`` and drop the Node dependency entirely.
"""
from __future__ import annotations
import asyncio
import hashlib
import hmac
import json
import logging
import os
import secrets
import shutil
import signal
import subprocess
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Optional
try:
import httpx
HTTPX_AVAILABLE = True
except ImportError: # pragma: no cover - httpx is already a Hermes dep
HTTPX_AVAILABLE = False
httpx = None # type: ignore[assignment]
try:
from aiohttp import web
AIOHTTP_AVAILABLE = True
except ImportError:
AIOHTTP_AVAILABLE = False
web = None # type: ignore[assignment]
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import (
BasePlatformAdapter,
MessageEvent,
MessageType,
SendResult,
)
from .auth import (
DEFAULT_SPECTRUM_HOST,
load_project_credentials,
_spectrum_host,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
_DEFAULT_WEBHOOK_PORT = 8788
_DEFAULT_WEBHOOK_PATH = "/photon/webhook"
_DEFAULT_WEBHOOK_BIND = "0.0.0.0"
_DEFAULT_SIDECAR_PORT = 8789
_DEFAULT_SIDECAR_BIND = "127.0.0.1"
# Photon iMessage messages from the SDK side have no documented hard
# limit, but the underlying iMessage protocol limits practical message
# size to ~16 KB. Keep a conservative cap that matches BlueBubbles.
_MAX_MESSAGE_LENGTH = 8000
# Spec says reject deliveries older than ~5 minutes for replay protection.
_TIMESTAMP_DRIFT_SECONDS = 300
# Dedup parameters — keep at least 1k IDs for ~48h per Photon's
# at-least-once guidance.
_DEDUP_MAX_SIZE = 4000
_DEDUP_WINDOW_SECONDS = 48 * 3600
_SIDECAR_DIR = Path(__file__).parent / "sidecar"
# ---------------------------------------------------------------------------
# Module-level helpers — also used by check_fn / standalone send
def _coerce_port(value: Any, default: int) -> int:
try:
return int(value)
except (TypeError, ValueError):
return default
def check_requirements() -> bool:
"""Return True when both Python deps and the Node sidecar are available."""
if not HTTPX_AVAILABLE or not AIOHTTP_AVAILABLE:
return False
if not shutil.which(os.getenv("PHOTON_NODE_BIN") or "node"):
return False
if not (_SIDECAR_DIR / "node_modules").exists():
# spectrum-ts not installed yet — `hermes photon setup` will
# install it. check_fn still returns False so the gateway
# surfaces the missing-deps state in `hermes setup` / status.
return False
return True
def validate_config(cfg: PlatformConfig) -> bool:
extra = cfg.extra or {}
project_id = extra.get("project_id") or os.getenv("PHOTON_PROJECT_ID")
project_secret = extra.get("project_secret") or os.getenv("PHOTON_PROJECT_SECRET")
if not project_id or not project_secret:
# Fall back to auth.json
stored_id, stored_sec = load_project_credentials()
return bool(stored_id and stored_sec)
return True
def is_connected(cfg: PlatformConfig) -> bool:
return validate_config(cfg)
def _env_enablement() -> Optional[dict]:
"""Seed PlatformConfig.extra from env so env-only setups appear in status."""
project_id, project_secret = load_project_credentials()
if not (project_id and project_secret):
return None
return {
"project_id": project_id,
"project_secret": project_secret,
"webhook_port": _coerce_port(os.getenv("PHOTON_WEBHOOK_PORT"), _DEFAULT_WEBHOOK_PORT),
"webhook_path": os.getenv("PHOTON_WEBHOOK_PATH") or _DEFAULT_WEBHOOK_PATH,
}
# ---------------------------------------------------------------------------
# Signature verification
def verify_signature(
*,
body: bytes,
timestamp_header: str,
signature_header: str,
signing_secret: str,
now: Optional[float] = None,
drift: int = _TIMESTAMP_DRIFT_SECONDS,
) -> bool:
"""Constant-time verify a Photon webhook signature.
Returns True iff the timestamp is within ``drift`` of *now* AND
``signature_header == "v0=" + hmac_sha256(secret, "v0:{ts}:{body}")``.
Exposed at module scope so tests can exercise it without an adapter
instance.
"""
if not timestamp_header or not signature_header or not signing_secret:
return False
try:
ts = int(timestamp_header)
except ValueError:
return False
if abs((now or time.time()) - ts) > drift:
return False
if not signature_header.startswith("v0="):
return False
expected = hmac.new(
signing_secret.encode("utf-8"),
f"v0:{ts}:".encode("utf-8") + body,
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(expected, signature_header[3:])
# ---------------------------------------------------------------------------
# Adapter
class PhotonAdapter(BasePlatformAdapter):
"""Inbound: signed webhook on aiohttp. Outbound: Node sidecar via loopback HTTP."""
MAX_MESSAGE_LENGTH = _MAX_MESSAGE_LENGTH
def __init__(self, config: PlatformConfig):
super().__init__(config, Platform("photon"))
extra = config.extra or {}
# Project credentials (env wins, then config.extra, then auth.json).
stored_id, stored_sec = load_project_credentials()
self._project_id: str = (
os.getenv("PHOTON_PROJECT_ID")
or extra.get("project_id")
or stored_id
or ""
)
self._project_secret: str = (
os.getenv("PHOTON_PROJECT_SECRET")
or extra.get("project_secret")
or stored_sec
or ""
)
# Webhook receiver
self._webhook_port = _coerce_port(
extra.get("webhook_port") or os.getenv("PHOTON_WEBHOOK_PORT"),
_DEFAULT_WEBHOOK_PORT,
)
self._webhook_path = (
extra.get("webhook_path")
or os.getenv("PHOTON_WEBHOOK_PATH")
or _DEFAULT_WEBHOOK_PATH
)
self._webhook_bind = (
extra.get("webhook_bind")
or os.getenv("PHOTON_WEBHOOK_BIND")
or _DEFAULT_WEBHOOK_BIND
)
self._webhook_secret: str = (
os.getenv("PHOTON_WEBHOOK_SECRET")
or extra.get("webhook_secret")
or ""
)
# Sidecar
self._sidecar_port = _coerce_port(
extra.get("sidecar_port") or os.getenv("PHOTON_SIDECAR_PORT"),
_DEFAULT_SIDECAR_PORT,
)
self._sidecar_bind = _DEFAULT_SIDECAR_BIND
self._sidecar_token = (
os.getenv("PHOTON_SIDECAR_TOKEN") or secrets.token_hex(16)
)
self._autostart_sidecar = str(
os.getenv("PHOTON_SIDECAR_AUTOSTART", "true")
).lower() not in ("0", "false", "no")
self._node_bin = os.getenv("PHOTON_NODE_BIN") or shutil.which("node") or "node"
# Runtime state
self._runner: Optional["web.AppRunner"] = None
self._sidecar_proc: Optional[subprocess.Popen] = None
self._sidecar_supervisor_task: Optional[asyncio.Task] = None
self._http_client: Optional["httpx.AsyncClient"] = None
# Lightweight in-memory dedup. Photon's at-least-once guarantee
# means we WILL see the same message.id more than once.
self._seen_messages: Dict[str, float] = {}
# -- Connection lifecycle ---------------------------------------------
async def connect(self) -> bool:
if not AIOHTTP_AVAILABLE:
self._set_fatal_error(
"MISSING_DEP",
"aiohttp not installed. Run: pip install aiohttp",
retryable=False,
)
return False
if not HTTPX_AVAILABLE:
self._set_fatal_error(
"MISSING_DEP", "httpx not installed", retryable=False
)
return False
if not self._project_id or not self._project_secret:
self._set_fatal_error(
"MISSING_CREDENTIALS",
"PHOTON_PROJECT_ID and PHOTON_PROJECT_SECRET are required. "
"Run: hermes photon setup",
retryable=False,
)
return False
# Start the aiohttp receiver first; without it the sidecar would
# be able to forward inbound traffic to a closed port.
try:
await self._start_webhook_server()
except OSError as e:
self._set_fatal_error(
"PORT_IN_USE",
f"webhook port {self._webhook_port} unavailable: {e}",
retryable=True,
)
return False
# Spin up the Node sidecar (required for outbound).
if self._autostart_sidecar:
try:
await self._start_sidecar()
except Exception as e:
self._set_fatal_error(
"SIDECAR_FAILED",
f"failed to start Photon sidecar: {e}",
retryable=True,
)
await self._stop_webhook_server()
return False
else:
logger.info("[photon] sidecar autostart disabled — outbound will fail")
self._http_client = httpx.AsyncClient(timeout=30.0)
self._mark_connected()
logger.info(
"[photon] connected — webhook at %s:%d%s, sidecar on %s:%d",
self._webhook_bind, self._webhook_port, self._webhook_path,
self._sidecar_bind, self._sidecar_port,
)
return True
async def disconnect(self) -> None:
await self._stop_sidecar()
await self._stop_webhook_server()
if self._http_client is not None:
try:
await self._http_client.aclose()
except Exception:
pass
self._http_client = None
self._mark_disconnected()
# -- Webhook server ----------------------------------------------------
async def _start_webhook_server(self) -> None:
app = web.Application()
app.router.add_post(self._webhook_path, self._handle_webhook)
app.router.add_get("/healthz", lambda _: web.Response(text="ok"))
self._runner = web.AppRunner(app)
await self._runner.setup()
site = web.TCPSite(self._runner, self._webhook_bind, self._webhook_port)
await site.start()
async def _stop_webhook_server(self) -> None:
if self._runner is not None:
try:
await self._runner.cleanup()
except Exception:
pass
self._runner = None
async def _handle_webhook(self, request: "web.Request") -> "web.Response":
body = await request.read()
if self._webhook_secret:
ts = request.headers.get("X-Spectrum-Timestamp", "")
sig = request.headers.get("X-Spectrum-Signature", "")
if not verify_signature(
body=body,
timestamp_header=ts,
signature_header=sig,
signing_secret=self._webhook_secret,
):
logger.warning("[photon] rejected webhook with bad signature")
return web.Response(status=401, text="invalid signature")
else:
logger.warning(
"[photon] PHOTON_WEBHOOK_SECRET unset — accepting unsigned "
"deliveries. Set the per-URL signing secret returned by "
"register-webhook to enable verification."
)
try:
payload = json.loads(body or b"{}")
except json.JSONDecodeError:
return web.Response(status=400, text="invalid json")
if payload.get("event") != "messages":
# Photon currently emits only `messages`; any future event
# types are ack'd 200 so they don't retry.
return web.Response(text="ok")
msg = payload.get("message") or {}
msg_id = msg.get("id")
if not msg_id:
return web.Response(status=400, text="missing message.id")
if self._is_duplicate(msg_id):
return web.Response(text="ok (dup)")
try:
await self._dispatch_inbound(payload)
except Exception:
logger.exception("[photon] inbound dispatch failed")
# 200 anyway — we own the dedup; failing here would cause
# Photon to retry the same id.
return web.Response(text="ok")
def _is_duplicate(self, msg_id: str) -> bool:
now = time.time()
if len(self._seen_messages) > _DEDUP_MAX_SIZE:
cutoff = now - _DEDUP_WINDOW_SECONDS
self._seen_messages = {
k: v for k, v in self._seen_messages.items() if v > cutoff
}
if msg_id in self._seen_messages:
return True
self._seen_messages[msg_id] = now
return False
async def _dispatch_inbound(self, payload: Dict[str, Any]) -> None:
msg = payload.get("message") or {}
space = msg.get("space") or payload.get("space") or {}
sender = msg.get("sender") or {}
content = msg.get("content") or {}
space_id = space.get("id") or ""
sender_id = sender.get("id") or ""
if not space_id:
logger.warning("[photon] inbound missing space.id")
return
# Space type — Photon documents iMessage DM ids as `any;-;+E164`
# and group ids as `any;+;<chat-guid>`. Use that as the
# heuristic; everything else is treated as DM.
chat_type = "group" if ";+;" in space_id else "dm"
# Timestamp — ISO 8601 from the platform.
ts_str = msg.get("timestamp") or ""
try:
timestamp = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
except ValueError:
timestamp = datetime.now(tz=timezone.utc)
# Content normalization. Spectrum is a discriminated union;
# text vs attachment metadata. Attachments are metadata-only
# today (no download URL) — log + carry the name so the agent
# at least knows something was sent.
if content.get("type") == "text":
text = content.get("text") or ""
mtype = MessageType.TEXT
elif content.get("type") == "attachment":
name = content.get("name") or "(unnamed)"
mime = content.get("mimeType") or ""
text = f"[Photon attachment received: {name} ({mime}) — no download URL yet]"
mtype = _attachment_message_type(mime)
else:
text = f"[Photon content type not handled: {content.get('type')}]"
mtype = MessageType.TEXT
source = self.build_source(
chat_id=space_id,
chat_name=space_id,
chat_type=chat_type,
user_id=sender_id or space_id,
user_name=sender_id or None,
)
event = MessageEvent(
text=text,
message_type=mtype,
source=source,
message_id=msg.get("id"),
raw_message=payload,
timestamp=timestamp,
)
await self.handle_message(event)
# -- Sidecar lifecycle -------------------------------------------------
async def _start_sidecar(self) -> None:
if not (_SIDECAR_DIR / "node_modules").exists():
raise RuntimeError(
f"Photon sidecar deps not installed. Run: "
f"cd {_SIDECAR_DIR} && npm install (or `hermes photon setup`)"
)
env = os.environ.copy()
env["PHOTON_PROJECT_ID"] = self._project_id
env["PHOTON_PROJECT_SECRET"] = self._project_secret
env["PHOTON_SIDECAR_PORT"] = str(self._sidecar_port)
env["PHOTON_SIDECAR_BIND"] = self._sidecar_bind
env["PHOTON_SIDECAR_TOKEN"] = self._sidecar_token
self._sidecar_proc = subprocess.Popen( # noqa: S603
[self._node_bin, str(_SIDECAR_DIR / "index.mjs")],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=env,
start_new_session=(sys.platform != "win32"),
)
# Pump sidecar stderr/stdout into our logger so users see crashes.
loop = asyncio.get_event_loop()
self._sidecar_supervisor_task = loop.create_task(
self._supervise_sidecar(self._sidecar_proc)
)
# Wait for /healthz to come up — give it up to 15s on cold start.
deadline = time.time() + 15.0
last_err: Optional[Exception] = None
async with httpx.AsyncClient(timeout=2.0) as client:
while time.time() < deadline:
if self._sidecar_proc.poll() is not None:
raise RuntimeError(
f"Photon sidecar exited with code "
f"{self._sidecar_proc.returncode} before becoming ready"
)
try:
resp = await client.post(
f"http://{self._sidecar_bind}:{self._sidecar_port}/healthz",
headers={"X-Hermes-Sidecar-Token": self._sidecar_token},
)
if resp.status_code == 200:
return
except httpx.RequestError as e:
last_err = e
await asyncio.sleep(0.2)
raise RuntimeError(
f"Photon sidecar did not become ready within 15s: {last_err}"
)
async def _supervise_sidecar(self, proc: subprocess.Popen) -> None:
"""Pump the sidecar's stdout/stderr into our logger."""
loop = asyncio.get_event_loop()
try:
while True:
line = await loop.run_in_executor(None, proc.stdout.readline)
if not line:
break
logger.info("[photon-sidecar] %s", line.decode("utf-8", "replace").rstrip())
except Exception as e: # pragma: no cover - defensive
logger.warning("[photon-sidecar] supervisor exited: %s", e)
async def _stop_sidecar(self) -> None:
proc = self._sidecar_proc
if proc is None:
return
try:
# Polite shutdown first.
if self._http_client is not None:
try:
await self._http_client.post(
f"http://{self._sidecar_bind}:{self._sidecar_port}/shutdown",
headers={"X-Hermes-Sidecar-Token": self._sidecar_token},
timeout=2.0,
)
except Exception:
pass
try:
proc.wait(timeout=3.0)
except subprocess.TimeoutExpired:
if sys.platform != "win32":
try:
os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
except (ProcessLookupError, PermissionError):
proc.terminate()
else:
proc.terminate()
try:
proc.wait(timeout=2.0)
except subprocess.TimeoutExpired:
proc.kill()
finally:
self._sidecar_proc = None
if self._sidecar_supervisor_task is not None:
self._sidecar_supervisor_task.cancel()
self._sidecar_supervisor_task = None
# -- Outbound ----------------------------------------------------------
async def send(
self,
chat_id: str,
content: str,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
return await self._sidecar_send(chat_id, content, reply_to=reply_to)
async def send_typing(self, chat_id: str, metadata=None) -> None:
try:
await self._sidecar_call("/typing", {"spaceId": chat_id})
except Exception as e:
logger.debug("[photon] send_typing failed: %s", e)
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
"""Return whatever we know about a Spectrum space id.
Photon's `space.id` is opaque (`any;-;+E164` for DMs,
`any;+;<guid>` for groups). We surface that shape directly so
the gateway has something to show in session pickers / logs.
"""
chat_type = "group" if ";+;" in chat_id else "dm"
return {"name": chat_id, "type": chat_type, "id": chat_id}
async def _sidecar_send(
self, space_id: str, text: str, *, reply_to: Optional[str] = None,
) -> SendResult:
if len(text) > self.MAX_MESSAGE_LENGTH:
logger.warning(
"[photon] truncating outbound from %d to %d chars",
len(text), self.MAX_MESSAGE_LENGTH,
)
text = text[: self.MAX_MESSAGE_LENGTH]
body: Dict[str, Any] = {"spaceId": space_id, "text": text}
if reply_to:
body["replyTo"] = reply_to
try:
data = await self._sidecar_call("/send", body)
except Exception as e:
return SendResult(success=False, error=str(e))
return SendResult(success=True, message_id=data.get("messageId"))
async def _sidecar_call(self, path: str, body: Dict[str, Any]) -> Dict[str, Any]:
if self._http_client is None:
raise RuntimeError("Photon adapter not connected")
resp = await self._http_client.post(
f"http://{self._sidecar_bind}:{self._sidecar_port}{path}",
json=body,
headers={"X-Hermes-Sidecar-Token": self._sidecar_token},
timeout=30.0,
)
if resp.status_code != 200:
raise RuntimeError(
f"Photon sidecar {path} returned {resp.status_code}: {resp.text[:200]}"
)
data = resp.json() or {}
if not data.get("ok"):
raise RuntimeError(
f"Photon sidecar {path} reported error: {data.get('error')}"
)
return data
# ---------------------------------------------------------------------------
# Helpers
def _attachment_message_type(mime: str) -> MessageType:
mime = (mime or "").lower()
if mime.startswith("image/"):
return MessageType.PHOTO
if mime.startswith("video/"):
return MessageType.VIDEO
if mime.startswith("audio/"):
return MessageType.AUDIO
if mime.startswith("application/"):
return MessageType.DOCUMENT
return MessageType.DOCUMENT
# ---------------------------------------------------------------------------
# Standalone (out-of-process) send for cron deliveries when the gateway
# is not co-resident. Spins up an ephemeral sidecar call by spawning
# the existing sidecar binary one-shot; if a live sidecar is already
# listening on the configured port we reuse it.
async def _standalone_send(
pconfig: PlatformConfig,
chat_id: str,
message: str,
*,
thread_id: Optional[str] = None, # noqa: ARG001 — Spectrum has no threads yet
media_files: Optional[list] = None, # noqa: ARG001 — attachment send not supported yet
force_document: bool = False, # noqa: ARG001
) -> Dict[str, Any]:
if not HTTPX_AVAILABLE:
return {"error": "httpx not installed"}
port = _coerce_port(
(pconfig.extra or {}).get("sidecar_port") or os.getenv("PHOTON_SIDECAR_PORT"),
_DEFAULT_SIDECAR_PORT,
)
token = os.getenv("PHOTON_SIDECAR_TOKEN")
if not token:
return {
"error": (
"Photon standalone send requires a running sidecar with "
"PHOTON_SIDECAR_TOKEN set in the environment. Cron processes "
"cannot spawn the sidecar themselves."
)
}
body: Dict[str, Any] = {"spaceId": chat_id, "text": message[:_MAX_MESSAGE_LENGTH]}
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f"http://{_DEFAULT_SIDECAR_BIND}:{port}/send",
json=body,
headers={"X-Hermes-Sidecar-Token": token},
)
if resp.status_code != 200:
return {"error": f"sidecar returned {resp.status_code}: {resp.text[:200]}"}
data = resp.json() or {}
if not data.get("ok"):
return {"error": data.get("error") or "sidecar reported failure"}
return {"success": True, "message_id": data.get("messageId")}
except Exception as e:
return {"error": f"Photon standalone send failed: {e}"}
# ---------------------------------------------------------------------------
# Plugin entry point
def register(ctx) -> None:
"""Called by the Hermes plugin loader at startup."""
ctx.register_platform(
name="photon",
label="Photon iMessage",
adapter_factory=lambda cfg: PhotonAdapter(cfg),
check_fn=check_requirements,
validate_config=validate_config,
is_connected=is_connected,
required_env=["PHOTON_PROJECT_ID", "PHOTON_PROJECT_SECRET"],
install_hint=(
"Run: hermes photon setup (logs in via device flow, creates a "
"Spectrum project, links your phone number, installs the "
"spectrum-ts sidecar)."
),
env_enablement_fn=_env_enablement,
cron_deliver_env_var="PHOTON_HOME_CHANNEL",
standalone_sender_fn=_standalone_send,
allowed_users_env="PHOTON_ALLOWED_USERS",
allow_all_env="PHOTON_ALLOW_ALL_USERS",
max_message_length=_MAX_MESSAGE_LENGTH,
emoji="📱",
# iMessage carries E.164 phone numbers — treat session descriptions
# as PII-sensitive so they get redacted in logs.
pii_safe=False,
allow_update_command=True,
platform_hint=(
"You are communicating via Photon Spectrum (iMessage). "
"Treat replies like regular text messages — short, friendly, no "
"markdown rendering. Recipient identifiers are E.164 phone "
"numbers; never expose them in responses unless the user asked. "
"Attachments arrive as metadata only (no download URL yet)."
),
)
# Register CLI subcommands — `hermes photon ...`
from . import cli as _cli # local import to avoid argparse at module load
ctx.register_cli_command(
name="photon",
help="Set up and manage the Photon iMessage integration",
setup_fn=_cli.register_cli,
handler_fn=_cli.dispatch,
)

View file

@ -0,0 +1,438 @@
"""
Photon Dashboard + Spectrum API client and device-code login flow.
This module is pure Python it intentionally does not depend on
``spectrum-ts``. All management-plane operations (login, create
project, create user, register webhook) talk to Photon's HTTP API
directly:
Dashboard API https://app.photon.codes/api/...
OAuth bearer token from device flow
Spectrum API https://spectrum.photon.codes/projects/{id}/...
HTTP Basic with (projectId, projectSecret)
The webhook receiver + Node sidecar in ``adapter.py`` consume the
credentials this module persists to ``~/.hermes/auth.json``.
Reference docs (read at integration time):
https://photon.codes/docs/api-reference/introduction
https://photon.codes/docs/api-reference/device-login/request-device-+-user-code
https://photon.codes/docs/api-reference/device-login/exchange-device-code-for-token
https://photon.codes/docs/api-reference/projects/create-project
https://photon.codes/docs/api-reference/users/create-user
https://photon.codes/docs/webhooks/overview
"""
from __future__ import annotations
import json
import logging
import os
import re
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
try:
import httpx
except ImportError: # pragma: no cover - httpx is a hermes dependency
httpx = None # type: ignore[assignment]
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# Photon's published OAuth device-client identifier for first-party CLIs.
# We use a fixed "hermes-agent" client_id string — Photon's device endpoint
# accepts any opaque client_id and ties the bearer token to the approving
# user, not to the client. If Photon later requires registered clients,
# this is the one knob to update.
DEFAULT_CLIENT_ID = "hermes-agent"
DEFAULT_DASHBOARD_HOST = "https://app.photon.codes"
DEFAULT_SPECTRUM_HOST = "https://spectrum.photon.codes"
# Polling defaults per RFC 8628. Photon may override via `interval` /
# `expires_in` fields in the device-code response — those win.
DEFAULT_POLL_INTERVAL = 5
DEFAULT_POLL_TIMEOUT = 900 # 15 minutes is conservative; Photon returns expires_in
E164_RE = re.compile(r"^\+[1-9]\d{6,14}$")
# ---------------------------------------------------------------------------
# auth.json helpers — share the file with the rest of hermes-agent.
def _auth_json_path() -> Path:
"""Resolve ``~/.hermes/auth.json`` honouring the active Hermes profile."""
try:
from hermes_constants import get_hermes_home # type: ignore
return Path(get_hermes_home()) / "auth.json"
except Exception:
return Path(os.path.expanduser("~/.hermes")) / "auth.json"
def _load_auth() -> Dict[str, Any]:
path = _auth_json_path()
if not path.exists():
return {}
try:
with path.open("r", encoding="utf-8") as fh:
return json.load(fh) or {}
except (OSError, json.JSONDecodeError) as e:
logger.warning("photon: could not read %s: %s", path, e)
return {}
def _save_auth(data: Dict[str, Any]) -> None:
path = _auth_json_path()
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(".json.tmp")
with tmp.open("w", encoding="utf-8") as fh:
json.dump(data, fh, indent=2, sort_keys=True)
try:
os.chmod(tmp, 0o600)
except OSError:
pass
tmp.replace(path)
def load_photon_token() -> Optional[str]:
"""Return the bearer token stored by ``login()`` or ``None``."""
auth = _load_auth()
pool = auth.get("credential_pool", {}).get("photon") or []
if isinstance(pool, list) and pool:
token = pool[0].get("access_token") or pool[0].get("token")
if token:
return str(token)
# Backwards-compat shape: providers.photon.access_token
legacy = auth.get("providers", {}).get("photon", {})
if legacy.get("access_token"):
return str(legacy["access_token"])
return None
def store_photon_token(token: str) -> None:
"""Persist a dashboard bearer token under ``credential_pool.photon``."""
auth = _load_auth()
auth.setdefault("credential_pool", {})["photon"] = [
{"access_token": token, "issued_at": int(time.time())}
]
_save_auth(auth)
def load_project_credentials() -> Tuple[Optional[str], Optional[str]]:
"""Return ``(project_id, project_secret)`` from auth.json + env override."""
env_id = os.getenv("PHOTON_PROJECT_ID")
env_sec = os.getenv("PHOTON_PROJECT_SECRET")
if env_id and env_sec:
return env_id, env_sec
auth = _load_auth()
proj = auth.get("credential_pool", {}).get("photon_project") or []
if isinstance(proj, list) and proj:
entry = proj[0]
return (
env_id or entry.get("project_id"),
env_sec or entry.get("project_secret"),
)
return env_id, env_sec
def store_project_credentials(project_id: str, project_secret: str, **extra: Any) -> None:
"""Persist the Spectrum project's id+secret under ``credential_pool.photon_project``."""
auth = _load_auth()
record = {
"project_id": project_id,
"project_secret": project_secret,
"issued_at": int(time.time()),
}
record.update(extra)
auth.setdefault("credential_pool", {})["photon_project"] = [record]
_save_auth(auth)
# ---------------------------------------------------------------------------
# Device login flow (RFC 8628)
@dataclass
class DeviceCode:
device_code: str
user_code: str
verification_uri: str
verification_uri_complete: Optional[str]
expires_in: int
interval: int
def _dashboard_host() -> str:
return (os.getenv("PHOTON_DASHBOARD_HOST") or DEFAULT_DASHBOARD_HOST).rstrip("/")
def _spectrum_host() -> str:
return (os.getenv("PHOTON_API_HOST") or DEFAULT_SPECTRUM_HOST).rstrip("/")
def request_device_code(
*, client_id: str = DEFAULT_CLIENT_ID, scope: Optional[str] = None,
) -> DeviceCode:
"""POST ``/api/auth/device/code`` and return the device + user codes."""
if httpx is None:
raise RuntimeError("httpx is required for Photon device login")
url = f"{_dashboard_host()}/api/auth/device/code"
body: Dict[str, Any] = {"client_id": client_id}
if scope:
body["scope"] = scope
resp = httpx.post(url, json=body, timeout=30.0)
resp.raise_for_status()
data = resp.json()
return DeviceCode(
device_code=data["device_code"],
user_code=data["user_code"],
verification_uri=data["verification_uri"],
verification_uri_complete=data.get("verification_uri_complete"),
expires_in=int(data.get("expires_in") or DEFAULT_POLL_TIMEOUT),
interval=int(data.get("interval") or DEFAULT_POLL_INTERVAL),
)
def poll_for_token(
code: DeviceCode,
*,
client_id: str = DEFAULT_CLIENT_ID,
timeout: Optional[int] = None,
interval: Optional[int] = None,
on_pending: Optional[callable] = None,
) -> str:
"""Poll ``/api/auth/device/token`` until the user approves.
Returns the bearer token from the ``set-auth-token`` response header
(Photon's documented mechanism). Falls back to ``session.access_token``
in the JSON body if the header is absent see the API spec.
"""
if httpx is None:
raise RuntimeError("httpx is required for Photon device login")
url = f"{_dashboard_host()}/api/auth/device/token"
deadline = time.time() + (timeout or code.expires_in or DEFAULT_POLL_TIMEOUT)
sleep = interval or code.interval or DEFAULT_POLL_INTERVAL
while time.time() < deadline:
try:
resp = httpx.post(
url,
json={
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"device_code": code.device_code,
"client_id": client_id,
},
timeout=30.0,
)
except httpx.RequestError as e:
logger.warning("photon: device-token poll failed: %s", e)
time.sleep(sleep)
continue
if resp.status_code == 200:
token = resp.headers.get("set-auth-token")
if not token:
body = resp.json() or {}
session = body.get("session") or {}
token = session.get("access_token") or body.get("access_token")
if not token:
raise RuntimeError(
"Photon returned 200 but no token in headers or body"
)
return token
if resp.status_code == 400:
# RFC 8628 §3.5 — error codes are returned with 400.
body: Dict[str, Any] = {}
try:
body = resp.json() or {}
except json.JSONDecodeError:
pass
err = body.get("error") or body.get("message") or ""
if err in ("authorization_pending", "slow_down"):
if on_pending:
try:
on_pending()
except Exception:
pass
if err == "slow_down":
sleep += 5
time.sleep(sleep)
continue
if err in ("expired_token", "access_denied"):
raise RuntimeError(f"Photon login failed: {err}")
# Unknown error — surface it
raise RuntimeError(f"Photon device token error: {err or resp.text}")
# Unexpected status; log and retry
logger.warning(
"photon: device-token unexpected status %s: %s",
resp.status_code, resp.text[:200],
)
time.sleep(sleep)
raise TimeoutError("Photon device login timed out")
def login_device_flow(
*,
client_id: str = DEFAULT_CLIENT_ID,
open_browser: bool = True,
on_user_code: Optional[callable] = None,
) -> str:
"""Run the full device-code login flow and persist the token.
Returns the bearer token. ``on_user_code`` is a callback receiving the
:class:`DeviceCode` so callers can print + optionally open the browser.
"""
code = request_device_code(client_id=client_id)
if on_user_code:
try:
on_user_code(code)
except Exception:
pass
if open_browser:
try:
import webbrowser
target = code.verification_uri_complete or code.verification_uri
webbrowser.open(target, new=2)
except Exception:
pass
token = poll_for_token(code, client_id=client_id)
store_photon_token(token)
return token
# ---------------------------------------------------------------------------
# Dashboard API: create project
def create_project(
token: str,
*,
name: str,
location: str = "United States",
platforms: Optional[list] = None,
) -> Dict[str, Any]:
"""POST ``/api/projects/`` with ``spectrum: true`` and return the response.
The response includes ``spectrumProjectId`` and ``projectSecret`` those
are the HTTP Basic credentials for the Spectrum API. Photon only
returns ``projectSecret`` to project owners at creation time.
"""
if httpx is None:
raise RuntimeError("httpx is required for Photon project creation")
url = f"{_dashboard_host()}/api/projects/"
body: Dict[str, Any] = {
"name": name,
"location": location,
"spectrum": True,
"platforms": platforms or ["imessage"],
}
resp = httpx.post(
url,
json=body,
headers={"Authorization": f"Bearer {token}"},
timeout=30.0,
)
resp.raise_for_status()
return resp.json()
# ---------------------------------------------------------------------------
# Spectrum API: create user
def create_user(
project_id: str,
project_secret: str,
*,
phone_number: str,
user_type: str = "shared",
first_name: Optional[str] = None,
last_name: Optional[str] = None,
email: Optional[str] = None,
assigned_phone_number: Optional[str] = None,
) -> Dict[str, Any]:
"""POST ``/projects/{id}/users/`` on the Spectrum API.
For free users we always pass ``type=shared``; Photon's Cosmos pool
assigns the iMessage line. ``assigned_phone_number`` is only valid
for the paid ``dedicated`` mode.
"""
if httpx is None:
raise RuntimeError("httpx is required for Photon user creation")
if not E164_RE.match(phone_number):
raise ValueError(
f"phone_number must be E.164 (e.g. +15551234567); got {phone_number!r}"
)
url = f"{_spectrum_host()}/projects/{project_id}/users/"
body: Dict[str, Any] = {"type": user_type, "phoneNumber": phone_number}
if first_name:
body["firstName"] = first_name
if last_name:
body["lastName"] = last_name
if email:
body["email"] = email
if assigned_phone_number:
body["assignedPhoneNumber"] = assigned_phone_number
resp = httpx.post(
url,
json=body,
auth=(project_id, project_secret),
timeout=30.0,
)
resp.raise_for_status()
data = resp.json() or {}
if not data.get("succeed"):
raise RuntimeError(
f"Photon create-user failed: {data.get('message') or data}"
)
return data.get("data") or {}
# ---------------------------------------------------------------------------
# Spectrum API: webhook registration
#
# Endpoints from https://photon.codes/docs/webhooks/overview:
# POST /projects/{id}/webhooks/ register, returns signing secret ONCE
# GET /projects/{id}/webhooks/ list
# DELETE /projects/{id}/webhooks/{wid} remove
def register_webhook(
project_id: str, project_secret: str, *, webhook_url: str,
) -> Dict[str, Any]:
if httpx is None:
raise RuntimeError("httpx is required for Photon webhook registration")
url = f"{_spectrum_host()}/projects/{project_id}/webhooks/"
resp = httpx.post(
url,
json={"webhookUrl": webhook_url},
auth=(project_id, project_secret),
timeout=30.0,
)
resp.raise_for_status()
data = resp.json() or {}
if not data.get("succeed"):
raise RuntimeError(
f"Photon register-webhook failed: {data.get('message') or data}"
)
return data.get("data") or {}
def list_webhooks(project_id: str, project_secret: str) -> list:
if httpx is None:
raise RuntimeError("httpx is required for Photon webhook listing")
url = f"{_spectrum_host()}/projects/{project_id}/webhooks/"
resp = httpx.get(url, auth=(project_id, project_secret), timeout=30.0)
resp.raise_for_status()
data = resp.json() or {}
return data.get("data") or []
def delete_webhook(
project_id: str, project_secret: str, *, webhook_id: str,
) -> None:
if httpx is None:
raise RuntimeError("httpx is required for Photon webhook deletion")
url = f"{_spectrum_host()}/projects/{project_id}/webhooks/{webhook_id}"
resp = httpx.delete(url, auth=(project_id, project_secret), timeout=30.0)
if resp.status_code not in (200, 204, 404):
resp.raise_for_status()

View file

@ -0,0 +1,304 @@
"""
``hermes photon ...`` CLI subcommands registered by the plugin via
``ctx.register_cli_command()``.
Subcommands:
login run the device-code OAuth flow
setup full first-time setup (login + project + user + sidecar)
status show login + project + sidecar dep state
install-sidecar npm install inside plugins/platforms/photon/sidecar/
webhook register register the local webhook URL with Photon
webhook list list registered webhooks
webhook delete delete a webhook by id
"""
from __future__ import annotations
import argparse
import getpass
import json
import os
import shutil
import subprocess
import sys
from pathlib import Path
from typing import Any, Optional
from . import auth as photon_auth
_SIDECAR_DIR = Path(__file__).parent / "sidecar"
# ---------------------------------------------------------------------------
# argparse wiring
def register_cli(parser: argparse.ArgumentParser) -> None:
"""Wire up `hermes photon ...` subcommands."""
subs = parser.add_subparsers(dest="photon_command", required=False)
p_login = subs.add_parser("login", help="Authenticate with Photon (device flow)")
p_login.add_argument("--no-browser", action="store_true",
help="Don't try to open a browser; print the URL only")
p_setup = subs.add_parser("setup", help="First-time setup (login + project + user + sidecar)")
p_setup.add_argument("--project-name", default=None, help="Project name (default: 'Hermes Agent')")
p_setup.add_argument("--phone", default=None, help="Your E.164 phone number (e.g. +15551234567)")
p_setup.add_argument("--first-name", default=None)
p_setup.add_argument("--last-name", default=None)
p_setup.add_argument("--email", default=None)
p_setup.add_argument("--no-browser", action="store_true")
p_setup.add_argument("--skip-sidecar-install", action="store_true",
help="Skip `npm install` inside the sidecar directory")
subs.add_parser("status", help="Show login + project + sidecar dep state")
subs.add_parser("install-sidecar", help="Run npm install inside the sidecar directory")
p_hook = subs.add_parser("webhook", help="Manage Photon webhook registrations")
hook_subs = p_hook.add_subparsers(dest="photon_webhook_command", required=True)
p_hook_reg = hook_subs.add_parser("register", help="Register a webhook URL")
p_hook_reg.add_argument("url", help="Publicly reachable URL Photon should POST to")
hook_subs.add_parser("list", help="List registered webhooks for the current project")
p_hook_del = hook_subs.add_parser("delete", help="Delete a webhook by id")
p_hook_del.add_argument("webhook_id")
parser.set_defaults(func=dispatch)
# ---------------------------------------------------------------------------
# Dispatch
def dispatch(args: argparse.Namespace) -> int:
sub = getattr(args, "photon_command", None)
if sub is None:
# No subcommand given — show status by default.
return _cmd_status(args)
if sub == "login":
return _cmd_login(args)
if sub == "setup":
return _cmd_setup(args)
if sub == "status":
return _cmd_status(args)
if sub == "install-sidecar":
return _cmd_install_sidecar(args)
if sub == "webhook":
return _cmd_webhook(args)
print(f"unknown subcommand: {sub}", file=sys.stderr)
return 2
# ---------------------------------------------------------------------------
# Subcommand handlers
def _cmd_login(args: argparse.Namespace) -> int:
def _print_code(code):
target = code.verification_uri_complete or code.verification_uri
print()
print("┌─ Photon device login ────────────────────────────────────────")
print(f"│ Open this URL: {target}")
print(f"│ Enter the code: {code.user_code}")
print("│ (waiting for approval — Ctrl-C to cancel)")
print("└──────────────────────────────────────────────────────────────")
print()
try:
token = photon_auth.login_device_flow(
open_browser=not args.no_browser,
on_user_code=_print_code,
)
except Exception as e:
print(f"login failed: {e}", file=sys.stderr)
return 1
print(f"✓ logged in — token saved to {photon_auth._auth_json_path()}")
print(f" (first 8 chars: {token[:8]}…)")
return 0
def _cmd_setup(args: argparse.Namespace) -> int:
# 1. Login (skip if we already have a token).
token = photon_auth.load_photon_token()
if not token:
print("[1/4] No Photon token found — running device login...")
rc = _cmd_login(args)
if rc != 0:
return rc
token = photon_auth.load_photon_token()
if not token:
print("login completed but token was not stored", file=sys.stderr)
return 1
else:
print("[1/4] Reusing existing Photon token")
# 2. Create (or surface existing) project.
project_id, project_secret = photon_auth.load_project_credentials()
if project_id and project_secret:
print(f"[2/4] Reusing existing Photon project: {project_id}")
else:
name = args.project_name or "Hermes Agent"
print(f"[2/4] Creating Photon project '{name}' (spectrum=true, imessage)...")
try:
data = photon_auth.create_project(token, name=name)
except Exception as e:
print(f"create-project failed: {e}", file=sys.stderr)
return 1
project_id = data.get("spectrumProjectId") or data.get("id") or ""
project_secret = data.get("projectSecret") or ""
if not project_id or not project_secret:
print(
"create-project did not return spectrumProjectId + "
"projectSecret. Re-run after enabling Spectrum on the "
"project, or open https://app.photon.codes/ to fetch the "
"secret manually.",
file=sys.stderr,
)
return 1
photon_auth.store_project_credentials(project_id, project_secret, name=name)
print(f" project_id = {project_id}")
print(f" project_secret saved (first 8 chars: {project_secret[:8]}…)")
# 3. Create a Spectrum user for the operator.
phone = args.phone or _prompt(
"Your iMessage phone number (E.164, e.g. +15551234567): "
)
if not phone:
print("[3/4] Skipped user creation (no phone given). Re-run with --phone later.")
else:
print(f"[3/4] Creating shared Spectrum user for {phone}...")
try:
user = photon_auth.create_user(
project_id, project_secret,
phone_number=phone,
first_name=args.first_name,
last_name=args.last_name,
email=args.email,
)
except Exception as e:
print(f"create-user failed: {e}", file=sys.stderr)
return 1
assigned = user.get("assignedPhoneNumber") or "(pending)"
print(f" ✓ assigned iMessage number: {assigned}")
# 4. Sidecar deps.
if args.skip_sidecar_install:
print("[4/4] Skipping sidecar npm install (--skip-sidecar-install)")
else:
print("[4/4] Installing Node sidecar deps (spectrum-ts)...")
rc = _install_sidecar()
if rc != 0:
return rc
print()
print("✓ Photon setup complete.")
print(" Next: register a webhook URL Photon can reach:")
print(" hermes photon webhook register https://YOUR-PUBLIC-URL/photon/webhook")
print(" Then start the gateway:")
print(" hermes gateway start --platform photon")
return 0
def _cmd_status(_args: argparse.Namespace) -> int:
token = photon_auth.load_photon_token()
project_id, project_secret = photon_auth.load_project_credentials()
node_bin = os.getenv("PHOTON_NODE_BIN") or shutil.which("node")
sidecar_installed = (_SIDECAR_DIR / "node_modules").exists()
webhook_secret = bool(os.getenv("PHOTON_WEBHOOK_SECRET"))
print("Photon iMessage status")
print("──────────────────────")
print(f" device token : {'✓ stored' if token else '✗ missing (run `hermes photon login`)'}")
print(f" project id : {project_id or '✗ missing'}")
print(f" project secret : {'✓ stored' if project_secret else '✗ missing'}")
print(f" node binary : {node_bin or '✗ missing (install Node 18+)'}")
print(f" sidecar deps : {'✓ installed' if sidecar_installed else '✗ run `hermes photon install-sidecar`'}")
print(f" webhook secret : {'✓ set' if webhook_secret else '⚠ unset — verification disabled'}")
return 0
def _cmd_install_sidecar(_args: argparse.Namespace) -> int:
rc = _install_sidecar()
return rc
def _install_sidecar() -> int:
npm = shutil.which("npm") or "npm"
if not shutil.which(npm):
print(
"npm is not on PATH. Install Node.js 18+ (https://nodejs.org/) "
"and re-run.",
file=sys.stderr,
)
return 1
print(f" $ cd {_SIDECAR_DIR} && {npm} install")
proc = subprocess.run( # noqa: S603
[npm, "install"],
cwd=str(_SIDECAR_DIR),
check=False,
)
if proc.returncode != 0:
print("npm install failed", file=sys.stderr)
return proc.returncode
def _cmd_webhook(args: argparse.Namespace) -> int:
sub = getattr(args, "photon_webhook_command", None)
project_id, project_secret = photon_auth.load_project_credentials()
if not (project_id and project_secret):
print(
"no Photon project configured — run `hermes photon setup` first",
file=sys.stderr,
)
return 1
if sub == "register":
try:
data = photon_auth.register_webhook(
project_id, project_secret, webhook_url=args.url
)
except Exception as e:
print(f"register failed: {e}", file=sys.stderr)
return 1
print(json.dumps(data, indent=2))
secret = data.get("signingSecret") or data.get("secret")
if secret:
print()
print("‼ Save this signing secret NOW — Photon only returns it once.")
print(f" Add to ~/.hermes/.env:")
print(f" PHOTON_WEBHOOK_SECRET={secret}")
return 0
if sub == "list":
try:
data = photon_auth.list_webhooks(project_id, project_secret)
except Exception as e:
print(f"list failed: {e}", file=sys.stderr)
return 1
print(json.dumps(data, indent=2))
return 0
if sub == "delete":
try:
photon_auth.delete_webhook(
project_id, project_secret, webhook_id=args.webhook_id
)
except Exception as e:
print(f"delete failed: {e}", file=sys.stderr)
return 1
print(f"deleted webhook {args.webhook_id}")
return 0
print(f"unknown webhook subcommand: {sub}", file=sys.stderr)
return 2
# ---------------------------------------------------------------------------
# Small interactive helpers
def _prompt(prompt: str, *, secret: bool = False) -> str:
if not sys.stdin.isatty():
return ""
try:
if secret:
return getpass.getpass(prompt).strip()
return input(prompt).strip()
except (KeyboardInterrupt, EOFError):
print()
return ""

View file

@ -0,0 +1,83 @@
name: photon-platform
label: Photon iMessage
kind: platform
version: 0.1.0
description: >
Photon Spectrum gateway adapter for Hermes Agent.
Connects to iMessage (and other Spectrum interfaces) through Photon's
managed Spectrum platform. Inbound messages arrive as signed webhooks
on a local aiohttp server; outbound messages are sent via a small
supervised Node sidecar that runs the `spectrum-ts` SDK (Photon does
not currently expose a public HTTP send endpoint).
The plugin ships with a `hermes photon` CLI for the one-time login
+ project + user setup, persists Spectrum credentials to
``~/.hermes/auth.json`` under ``credential_pool.photon`` (token) and
``credential_pool.photon_project`` (project id + secret), and exposes
Photon's free shared-line model so users can get started without a
paid plan.
author: NousResearch
requires_env:
- name: PHOTON_PROJECT_ID
description: "Spectrum project ID (set by `hermes photon setup`)"
prompt: "Photon Spectrum project ID"
url: "https://app.photon.codes/"
password: false
- name: PHOTON_PROJECT_SECRET
description: "Spectrum project secret (set by `hermes photon setup`)"
prompt: "Photon Spectrum project secret"
url: "https://app.photon.codes/"
password: true
optional_env:
- name: PHOTON_WEBHOOK_SECRET
description: "Per-URL HMAC-SHA256 signing secret returned at webhook registration"
prompt: "Photon webhook signing secret"
password: true
- name: PHOTON_WEBHOOK_PORT
description: "Local port the webhook receiver listens on (default 8788)"
prompt: "Webhook receiver port"
password: false
- name: PHOTON_WEBHOOK_PATH
description: "Path the webhook receiver listens on (default /photon/webhook)"
prompt: "Webhook receiver path"
password: false
- name: PHOTON_WEBHOOK_BIND
description: "Bind address for the webhook receiver (default 0.0.0.0)"
prompt: "Webhook bind address"
password: false
- name: PHOTON_SIDECAR_PORT
description: "Loopback port for the Node sidecar control channel (default 8789)"
prompt: "Sidecar control port"
password: false
- name: PHOTON_SIDECAR_AUTOSTART
description: "Spawn the Node sidecar on connect (true/false, default true)"
prompt: "Auto-start the sidecar?"
password: false
- name: PHOTON_NODE_BIN
description: "Path to the node binary (default: shutil.which('node'))"
prompt: "Node executable path"
password: false
- name: PHOTON_API_HOST
description: "Spectrum management API host (default https://spectrum.photon.codes)"
prompt: "Spectrum API host"
password: false
- name: PHOTON_DASHBOARD_HOST
description: "Dashboard API host (default https://app.photon.codes)"
prompt: "Dashboard host"
password: false
- name: PHOTON_ALLOWED_USERS
description: "Comma-separated E.164 phone numbers allowed to talk to the bot"
prompt: "Allowed users (comma-separated)"
password: false
- name: PHOTON_ALLOW_ALL_USERS
description: "Allow any sender to trigger the bot (dev only — disables allowlist)"
prompt: "Allow all users? (true/false)"
password: false
- name: PHOTON_HOME_CHANNEL
description: "Default Spectrum space ID for cron / notification delivery"
prompt: "Home space ID"
password: false
- name: PHOTON_HOME_CHANNEL_NAME
description: "Human label for the home channel"
prompt: "Home channel display name"
password: false

View file

@ -0,0 +1,52 @@
# Photon sidecar
Small Node helper that bridges Hermes Agent to Photon's Spectrum SDK
(`spectrum-ts`). Hermes is Python; Photon has no public HTTP
send-message endpoint today; replies therefore go through this sidecar.
The sidecar:
- runs `Spectrum({ projectId, projectSecret, providers: [imessage.config()] })`
- exposes a loopback-only HTTP control channel for the Python adapter
to push send/typing requests (auth via `X-Hermes-Sidecar-Token`)
- drains the inbound message stream so `spectrum-ts` keeps its
reconnect/heartbeat machinery alive (real inbound delivery is via
Photon's signed webhook hitting our Python aiohttp server)
## Install
```bash
cd plugins/platforms/photon/sidecar
npm install
```
The Hermes plugin's `hermes photon setup` command runs `npm install`
here automatically.
## Run standalone
For debugging:
```bash
PHOTON_PROJECT_ID=... PHOTON_PROJECT_SECRET=... \
PHOTON_SIDECAR_PORT=8789 PHOTON_SIDECAR_TOKEN=$(openssl rand -hex 16) \
node index.mjs
```
In normal use, the Python adapter supervises this process — start,
restart on crash, kill on shutdown — and never asks the user to run
it by hand.
## Why a sidecar at all?
Photon publishes webhooks (inbound) but their docs state explicitly:
> Pass `space.id` to `Space.send(...)` from a separate `spectrum-ts`
> SDK instance to reply. No public HTTP send endpoint exists today.
— https://photon.codes/docs/webhooks/events
When Photon ships an HTTP send endpoint, the plan is to retire this
sidecar entirely and call it directly from Python. The plugin's
outbound code path is already isolated behind a single helper
(`_sidecar_send` in `adapter.py`) to make that swap a one-file change.

View file

@ -0,0 +1,221 @@
// Hermes Agent — Photon Spectrum sidecar
//
// Spawned by `plugins/platforms/photon/adapter.py` to bridge outbound
// messaging to Photon's Spectrum platform. Inbound messages go directly
// from Photon's webhook to Hermes' Python aiohttp receiver — this
// sidecar handles ONLY outbound calls (which require the spectrum-ts
// SDK because Photon has no public HTTP send endpoint today).
//
// Protocol:
// - The sidecar listens on http://127.0.0.1:${PORT} (loopback only)
// - Each request must include `X-Hermes-Sidecar-Token: ${TOKEN}`
// - POST /healthz -> {"ok": true}
// - POST /send -> {"ok": true, "messageId": "..."}
// body: {"spaceId": "...", "text": "...", "replyTo": "..." | null}
// - POST /typing -> {"ok": true}
// body: {"spaceId": "..."}
// - POST /shutdown -> {"ok": true}; then process exits
//
// On SIGINT/SIGTERM the sidecar calls `app.stop()` (3s graceful) before
// exiting. Errors are logged to stderr; Python supervises restart.
//
// Env vars (all required):
// PHOTON_PROJECT_ID
// PHOTON_PROJECT_SECRET
// PHOTON_SIDECAR_PORT
// PHOTON_SIDECAR_TOKEN
//
// Optional:
// PHOTON_SIDECAR_BIND (default 127.0.0.1)
// PHOTON_API_HOST (passed through to spectrum-ts if its config
// honours it)
import http from "node:http";
const projectId = process.env.PHOTON_PROJECT_ID;
const projectSecret = process.env.PHOTON_PROJECT_SECRET;
const port = parseInt(process.env.PHOTON_SIDECAR_PORT || "8789", 10);
const bind = process.env.PHOTON_SIDECAR_BIND || "127.0.0.1";
const sharedToken = process.env.PHOTON_SIDECAR_TOKEN;
if (!projectId || !projectSecret || !sharedToken) {
console.error(
"photon-sidecar: PHOTON_PROJECT_ID, PHOTON_PROJECT_SECRET and " +
"PHOTON_SIDECAR_TOKEN must all be set."
);
process.exit(2);
}
// Lazy-load spectrum-ts so a missing install fails with a clear message
// instead of a cryptic module-resolution error during import.
let Spectrum, imessage;
try {
({ Spectrum } = await import("spectrum-ts"));
({ imessage } = await import("spectrum-ts/providers/imessage"));
} catch (e) {
console.error(
"photon-sidecar: spectrum-ts is not installed. Run `npm install` " +
"inside plugins/platforms/photon/sidecar/. Original error: " +
(e && e.stack ? e.stack : String(e))
);
process.exit(3);
}
const app = await Spectrum({
projectId,
projectSecret,
providers: [imessage.config()],
});
// Drain the inbound stream — Photon's webhook is the canonical inbound
// path, but we still consume `app.messages` so spectrum-ts' internal
// reconnect/heartbeat logic keeps running. Each event is logged at
// debug level; everything else is a no-op here.
(async () => {
try {
for await (const [, message] of app.messages) {
console.error(
`photon-sidecar: drained inbound from ${message.platform} ` +
`space=${message.space?.id}`
);
}
} catch (e) {
console.error(
"photon-sidecar: inbound stream errored: " +
(e && e.stack ? e.stack : String(e))
);
}
})();
async function readBody(req) {
const chunks = [];
for await (const chunk of req) chunks.push(chunk);
const raw = Buffer.concat(chunks).toString("utf-8");
if (!raw) return {};
try {
return JSON.parse(raw);
} catch (e) {
throw new Error("invalid JSON body");
}
}
function unauthorized(res) {
res.statusCode = 401;
res.setHeader("Content-Type", "application/json");
res.end(JSON.stringify({ ok: false, error: "unauthorized" }));
}
function badRequest(res, msg) {
res.statusCode = 400;
res.setHeader("Content-Type", "application/json");
res.end(JSON.stringify({ ok: false, error: msg }));
}
function serverError(res, msg) {
res.statusCode = 500;
res.setHeader("Content-Type", "application/json");
res.end(JSON.stringify({ ok: false, error: msg }));
}
function ok(res, data) {
res.statusCode = 200;
res.setHeader("Content-Type", "application/json");
res.end(JSON.stringify({ ok: true, ...data }));
}
async function resolveSpace(spaceId) {
// spectrum-ts exposes the same Space methods via `app.space(spaceId)` /
// narrowed helpers; we fall back through a few accessor shapes to
// tolerate small SDK API drift.
if (typeof app.space === "function") {
return await app.space(spaceId);
}
if (app.spaces && typeof app.spaces.get === "function") {
return await app.spaces.get(spaceId);
}
// Last resort — the platform-narrowed helper.
if (imessage) {
const im = imessage(app);
if (typeof im.space === "function") {
try {
return await im.space({ id: spaceId });
} catch {
/* fall through */
}
}
}
throw new Error(`unable to resolve space id ${spaceId}`);
}
const server = http.createServer(async (req, res) => {
if (req.headers["x-hermes-sidecar-token"] !== sharedToken) {
return unauthorized(res);
}
if (req.method !== "POST") {
res.statusCode = 405;
return res.end();
}
try {
if (req.url === "/healthz") {
return ok(res, {});
}
if (req.url === "/shutdown") {
ok(res, {});
setTimeout(() => process.kill(process.pid, "SIGTERM"), 50);
return;
}
const body = await readBody(req);
if (req.url === "/send") {
const { spaceId, text, replyTo } = body || {};
if (!spaceId || typeof text !== "string") {
return badRequest(res, "spaceId and text are required");
}
const space = await resolveSpace(spaceId);
const result = replyTo
? await space.send(text, { replyTo })
: await space.send(text);
return ok(res, { messageId: result?.id || result?.messageId || null });
}
if (req.url === "/typing") {
const { spaceId } = body || {};
if (!spaceId) return badRequest(res, "spaceId is required");
const space = await resolveSpace(spaceId);
if (typeof space.typing === "function") {
await space.typing();
} else if (typeof space.setTyping === "function") {
await space.setTyping(true);
}
return ok(res, {});
}
res.statusCode = 404;
res.setHeader("Content-Type", "application/json");
return res.end(JSON.stringify({ ok: false, error: "not found" }));
} catch (e) {
console.error(
"photon-sidecar: handler error: " +
(e && e.stack ? e.stack : String(e))
);
return serverError(res, String((e && e.message) || e));
}
});
server.listen(port, bind, () => {
console.error(`photon-sidecar: listening on ${bind}:${port}`);
});
async function shutdown(signal) {
console.error(`photon-sidecar: received ${signal}, stopping...`);
try {
await Promise.race([
app.stop(),
new Promise((resolve) => setTimeout(resolve, 3000)),
]);
} catch (e) {
console.error("photon-sidecar: app.stop() failed: " + String(e));
}
server.close(() => process.exit(0));
setTimeout(() => process.exit(1), 500).unref();
}
process.on("SIGINT", () => shutdown("SIGINT"));
process.on("SIGTERM", () => shutdown("SIGTERM"));

View file

@ -0,0 +1,17 @@
{
"name": "@hermes-agent/photon-sidecar",
"private": true,
"version": "0.1.0",
"description": "Spectrum-ts bridge for the Hermes Agent Photon platform plugin.",
"type": "module",
"main": "index.mjs",
"scripts": {
"start": "node index.mjs"
},
"engines": {
"node": ">=18.17"
},
"dependencies": {
"spectrum-ts": "^0.1.0"
}
}

View file

@ -0,0 +1 @@
"""Unit tests for the Photon Spectrum platform plugin."""

View file

@ -0,0 +1,211 @@
"""Tests for the Photon auth module (device login + project + user creation)."""
from __future__ import annotations
import json
import time
from pathlib import Path
from typing import Any, Dict
import pytest
from plugins.platforms.photon import auth as photon_auth
# ---------------------------------------------------------------------------
# Fake httpx — we don't want to hit the real Photon API in unit tests.
class _FakeResponse:
def __init__(
self,
*,
status: int = 200,
json_body: Any = None,
headers: Dict[str, str] | None = None,
text: str = "",
) -> None:
self.status_code = status
self._json = json_body if json_body is not None else {}
self.headers = headers or {}
self.text = text
def json(self) -> Any:
return self._json
def raise_for_status(self) -> None:
if self.status_code >= 400:
raise RuntimeError(f"HTTP {self.status_code}")
@pytest.fixture
def tmp_hermes_home(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path:
home = tmp_path / "hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
# The auth module memoises by reading get_hermes_home at call time
# so the env var is what matters.
return home
def test_store_and_load_photon_token(tmp_hermes_home: Path) -> None:
photon_auth.store_photon_token("abc123def456")
assert photon_auth.load_photon_token() == "abc123def456"
auth_json = json.loads((tmp_hermes_home / "auth.json").read_text())
assert "credential_pool" in auth_json
assert auth_json["credential_pool"]["photon"][0]["access_token"] == "abc123def456"
def test_store_and_load_project_credentials(tmp_hermes_home: Path) -> None:
photon_auth.store_project_credentials(
"proj-uuid", "secret-key", name="Test Project",
)
pid, secret = photon_auth.load_project_credentials()
assert pid == "proj-uuid"
assert secret == "secret-key"
def test_load_project_credentials_env_override(
tmp_hermes_home: Path, monkeypatch: pytest.MonkeyPatch,
) -> None:
photon_auth.store_project_credentials("from-file", "secret-file")
monkeypatch.setenv("PHOTON_PROJECT_ID", "from-env")
monkeypatch.setenv("PHOTON_PROJECT_SECRET", "secret-env")
pid, secret = photon_auth.load_project_credentials()
assert pid == "from-env"
assert secret == "secret-env"
def test_request_device_code(monkeypatch: pytest.MonkeyPatch) -> None:
captured: Dict[str, Any] = {}
def fake_post(url: str, *, json: Dict[str, Any], timeout: float) -> _FakeResponse:
captured["url"] = url
captured["body"] = json
return _FakeResponse(json_body={
"device_code": "dev-code-xyz",
"user_code": "ABCD-1234",
"verification_uri": "https://app.photon.codes/device",
"verification_uri_complete": "https://app.photon.codes/device?code=ABCD-1234",
"expires_in": 600,
"interval": 5,
})
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
code = photon_auth.request_device_code()
assert code.device_code == "dev-code-xyz"
assert code.user_code == "ABCD-1234"
assert code.expires_in == 600
assert "/api/auth/device/code" in captured["url"]
assert captured["body"]["client_id"] == "hermes-agent"
def test_poll_for_token_via_header(monkeypatch: pytest.MonkeyPatch) -> None:
"""Token from set-auth-token header is the documented mechanism."""
def fake_post(url: str, *, json: Dict[str, Any], timeout: float) -> _FakeResponse:
return _FakeResponse(
status=200,
json_body={"session": {}, "user": {}},
headers={"set-auth-token": "bearer-xyz"},
)
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
code = photon_auth.DeviceCode(
device_code="d", user_code="u",
verification_uri="https://x", verification_uri_complete=None,
expires_in=10, interval=0,
)
token = photon_auth.poll_for_token(code, interval=0, timeout=2)
assert token == "bearer-xyz"
def test_poll_for_token_via_body_fallback(monkeypatch: pytest.MonkeyPatch) -> None:
"""If the header is absent we fall back to session.access_token."""
def fake_post(url: str, *, json: Dict[str, Any], timeout: float) -> _FakeResponse:
return _FakeResponse(
status=200,
json_body={"session": {"access_token": "from-body"}, "user": {}},
)
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
code = photon_auth.DeviceCode(
device_code="d", user_code="u",
verification_uri="https://x", verification_uri_complete=None,
expires_in=10, interval=0,
)
assert photon_auth.poll_for_token(code, interval=0, timeout=2) == "from-body"
def test_poll_for_token_propagates_access_denied(
monkeypatch: pytest.MonkeyPatch,
) -> None:
def fake_post(url: str, *, json: Dict[str, Any], timeout: float) -> _FakeResponse:
return _FakeResponse(
status=400, json_body={"error": "access_denied"},
)
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
code = photon_auth.DeviceCode(
device_code="d", user_code="u",
verification_uri="https://x", verification_uri_complete=None,
expires_in=10, interval=0,
)
with pytest.raises(RuntimeError, match="access_denied"):
photon_auth.poll_for_token(code, interval=0, timeout=2)
def test_create_user_rejects_invalid_phone() -> None:
with pytest.raises(ValueError, match="E.164"):
photon_auth.create_user(
"proj", "secret", phone_number="not-a-number",
)
def test_create_user_posts_shared_type(monkeypatch: pytest.MonkeyPatch) -> None:
captured: Dict[str, Any] = {}
def fake_post(url: str, *, json: Dict[str, Any], auth: tuple, timeout: float) -> _FakeResponse:
captured["url"] = url
captured["body"] = json
captured["auth"] = auth
return _FakeResponse(json_body={
"succeed": True,
"data": {
"id": "user-uuid",
"phoneNumber": "+15551234567",
"assignedPhoneNumber": "+15559999999",
},
})
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
user = photon_auth.create_user(
"proj-id", "proj-secret",
phone_number="+15551234567",
)
assert user["assignedPhoneNumber"] == "+15559999999"
assert captured["auth"] == ("proj-id", "proj-secret")
assert captured["body"]["type"] == "shared"
assert captured["body"]["phoneNumber"] == "+15551234567"
assert "/projects/proj-id/users/" in captured["url"]
def test_register_webhook_surfaces_secret(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_post(url: str, *, json: Dict[str, Any], auth: tuple, timeout: float) -> _FakeResponse:
return _FakeResponse(json_body={
"succeed": True,
"data": {
"id": "wh-uuid",
"webhookUrl": json["webhookUrl"],
"signingSecret": "0" * 64,
},
})
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
data = photon_auth.register_webhook(
"proj", "secret", webhook_url="https://x.example.com/hook",
)
assert data["signingSecret"] == "0" * 64
assert data["webhookUrl"] == "https://x.example.com/hook"

View file

@ -0,0 +1,139 @@
"""Inbound dispatch + dedup tests for PhotonAdapter.
These tests bypass the aiohttp server they call ``_dispatch_inbound``
and ``_is_duplicate`` directly. That keeps them fast and means we can
exercise the message-shape parsing logic without binding ports.
"""
from __future__ import annotations
from typing import List
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import MessageEvent, MessageType
from plugins.platforms.photon.adapter import PhotonAdapter
def _make_adapter(monkeypatch: pytest.MonkeyPatch) -> PhotonAdapter:
# Avoid touching real auth.json / env.
monkeypatch.setenv("PHOTON_PROJECT_ID", "test-project-id")
monkeypatch.setenv("PHOTON_PROJECT_SECRET", "test-project-secret")
monkeypatch.delenv("PHOTON_WEBHOOK_SECRET", raising=False)
cfg = PlatformConfig(enabled=True, token="", extra={})
return PhotonAdapter(cfg)
@pytest.mark.asyncio
async def test_dispatch_text_dm(monkeypatch: pytest.MonkeyPatch) -> None:
adapter = _make_adapter(monkeypatch)
captured: List[MessageEvent] = []
async def fake_handle(event: MessageEvent) -> None:
captured.append(event)
adapter.handle_message = fake_handle # type: ignore[assignment]
payload = {
"event": "messages",
"space": {"id": "any;-;+15551234567", "platform": "iMessage"},
"message": {
"id": "spc-msg-abc",
"platform": "iMessage",
"direction": "inbound",
"timestamp": "2026-05-14T19:06:32.000Z",
"sender": {"id": "+15551234567", "platform": "iMessage"},
"space": {"id": "any;-;+15551234567", "platform": "iMessage"},
"content": {"type": "text", "text": "hello world"},
},
}
await adapter._dispatch_inbound(payload)
assert len(captured) == 1
event = captured[0]
assert event.text == "hello world"
assert event.message_type == MessageType.TEXT
assert event.message_id == "spc-msg-abc"
src = event.source
assert src is not None
assert src.platform == Platform("photon")
assert src.chat_id == "any;-;+15551234567"
assert src.chat_type == "dm"
assert src.user_id == "+15551234567"
@pytest.mark.asyncio
async def test_dispatch_group_id_detected(monkeypatch: pytest.MonkeyPatch) -> None:
adapter = _make_adapter(monkeypatch)
captured: List[MessageEvent] = []
async def fake_handle(event: MessageEvent) -> None:
captured.append(event)
adapter.handle_message = fake_handle # type: ignore[assignment]
payload = {
"event": "messages",
"space": {"id": "any;+;group-guid-xyz", "platform": "iMessage"},
"message": {
"id": "spc-msg-grp",
"timestamp": "2026-05-14T19:06:32.000Z",
"sender": {"id": "+15551234567"},
"space": {"id": "any;+;group-guid-xyz"},
"content": {"type": "text", "text": "hi group"},
},
}
await adapter._dispatch_inbound(payload)
assert captured[0].source.chat_type == "group"
@pytest.mark.asyncio
async def test_dispatch_attachment_surfaces_marker(
monkeypatch: pytest.MonkeyPatch,
) -> None:
adapter = _make_adapter(monkeypatch)
captured: List[MessageEvent] = []
async def fake_handle(event: MessageEvent) -> None:
captured.append(event)
adapter.handle_message = fake_handle # type: ignore[assignment]
payload = {
"event": "messages",
"message": {
"id": "spc-msg-att",
"timestamp": "2026-05-14T19:06:32.000Z",
"sender": {"id": "+15551234567"},
"space": {"id": "any;-;+15551234567"},
"content": {
"type": "attachment",
"name": "IMG_4127.HEIC",
"mimeType": "image/heic",
"size": 12345,
},
},
}
await adapter._dispatch_inbound(payload)
assert len(captured) == 1
event = captured[0]
# Attachment carries metadata marker; mime → MessageType.PHOTO.
assert "Photon attachment received" in event.text
assert "IMG_4127.HEIC" in event.text
assert event.message_type == MessageType.PHOTO
def test_is_duplicate_window(monkeypatch: pytest.MonkeyPatch) -> None:
adapter = _make_adapter(monkeypatch)
assert adapter._is_duplicate("id-1") is False
assert adapter._is_duplicate("id-1") is True
assert adapter._is_duplicate("id-2") is False
assert adapter._is_duplicate("id-1") is True # still dup
def test_check_requirements_without_node(monkeypatch: pytest.MonkeyPatch) -> None:
# If no node binary on PATH the adapter should refuse to start.
from plugins.platforms.photon import adapter as adapter_mod
monkeypatch.setattr(adapter_mod.shutil, "which", lambda _name: None)
assert adapter_mod.check_requirements() is False

View file

@ -0,0 +1,95 @@
"""Signature verification tests for the Photon webhook receiver."""
from __future__ import annotations
import hashlib
import hmac
import time
import pytest
from plugins.platforms.photon.adapter import verify_signature
def _sign(secret: str, body: bytes, ts: int) -> str:
return "v0=" + hmac.new(
secret.encode(), f"v0:{ts}:".encode() + body, hashlib.sha256,
).hexdigest()
def test_accepts_valid_signature() -> None:
secret = "topsecret-32chars-or-whatever"
body = b'{"event":"messages"}'
ts = int(time.time())
sig = _sign(secret, body, ts)
assert verify_signature(
body=body, timestamp_header=str(ts), signature_header=sig,
signing_secret=secret,
)
def test_rejects_tampered_body() -> None:
secret = "s"
body = b'{"event":"messages"}'
ts = int(time.time())
sig = _sign(secret, body, ts)
assert not verify_signature(
body=body + b" tamper", timestamp_header=str(ts),
signature_header=sig, signing_secret=secret,
)
def test_rejects_wrong_secret() -> None:
body = b"x"
ts = int(time.time())
sig = _sign("right", body, ts)
assert not verify_signature(
body=body, timestamp_header=str(ts), signature_header=sig,
signing_secret="wrong",
)
def test_rejects_drifted_timestamp() -> None:
secret = "s"
body = b"x"
ts = int(time.time()) - 3600 # 1h old; drift window is 5 min
sig = _sign(secret, body, ts)
assert not verify_signature(
body=body, timestamp_header=str(ts), signature_header=sig,
signing_secret=secret,
)
def test_rejects_missing_v0_prefix() -> None:
secret = "s"
body = b"x"
ts = int(time.time())
raw_hex = hmac.new(
secret.encode(), f"v0:{ts}:".encode() + body, hashlib.sha256,
).hexdigest()
# Strip the "v0=" prefix — verify_signature must reject.
assert not verify_signature(
body=body, timestamp_header=str(ts), signature_header=raw_hex,
signing_secret=secret,
)
def test_rejects_empty_inputs() -> None:
assert not verify_signature(
body=b"x", timestamp_header="", signature_header="v0=abc",
signing_secret="s",
)
assert not verify_signature(
body=b"x", timestamp_header="123", signature_header="",
signing_secret="s",
)
assert not verify_signature(
body=b"x", timestamp_header="123", signature_header="v0=abc",
signing_secret="",
)
def test_rejects_non_integer_timestamp() -> None:
assert not verify_signature(
body=b"x", timestamp_header="not-an-int",
signature_header="v0=abc", signing_secret="s",
)

View file

@ -0,0 +1,167 @@
---
sidebar_position: 18
---
# Photon iMessage
Connect Hermes to **iMessage** through [Photon][photon], a managed
service that handles the Apple line allocation and abuse-prevention
layer so you don't have to run your own Mac relay.
The free tier uses Photon's shared iMessage line pool — different
recipients may see different sending numbers, but each conversation
stays stable. The paid Business tier gives every user the same
dedicated number; the plugin supports both, and the free tier is the
recommended starting point.
:::info Free to start
Photon's shared-line pool is free. No subscription is required to send
your first iMessage from Hermes — just a phone number we can bind to
your account.
:::
## Architecture
Inbound messages arrive as **signed webhooks**: Photon POSTs JSON with
an `X-Spectrum-Signature` header to a URL you register, and Hermes'
aiohttp listener verifies the HMAC-SHA256 signature before dispatching
the event into the agent.
Outbound replies go through a small supervised **Node sidecar** that
runs the `spectrum-ts` SDK on loopback. Photon does not currently
expose a public HTTP send-message endpoint — that's a roadmap item on
their side — so until then the sidecar is the only way to call
`Space.send(...)`. The Python plugin starts, supervises, and shuts
down the sidecar automatically. When Photon ships an HTTP send
endpoint we'll retire the sidecar in a follow-up release.
## Prerequisites
- A Photon account — sign up at [app.photon.codes][app]
- **Node.js 18.17 or newer** on PATH (`node --version`)
- A phone number that can receive iMessage (used to bind your account)
- A publicly reachable URL for the webhook receiver — Cloudflare
Tunnel, ngrok, or your own gateway hostname all work
## First-time setup
```bash
# Device-code login + project + user + sidecar deps, all in one
hermes photon setup --phone +15551234567
```
The wizard:
1. Opens `https://app.photon.codes/` for device approval
2. Creates a Spectrum-enabled project under your account
3. Calls the Spectrum `create-user` endpoint with `type: shared` so
Photon allocates an iMessage line from the free pool
4. Runs `npm install` inside the plugin's sidecar directory
Credentials are stored in `~/.hermes/auth.json` under
`credential_pool.photon` (bearer token) and
`credential_pool.photon_project` (project id + secret).
## Registering the webhook
Photon needs a public URL it can POST to. Expose your local listener
(default port 8788, path `/photon/webhook`) via Cloudflare Tunnel or
ngrok, then:
```bash
hermes photon webhook register https://YOUR-PUBLIC-URL/photon/webhook
```
The response includes a `signingSecret` — **Photon only returns it
once.** Save it to `~/.hermes/.env`:
```bash
PHOTON_WEBHOOK_SECRET=v0_64-char-hex...
```
The plugin verifies every inbound `POST` against this secret and
rejects deliveries with a timestamp drift greater than 5 minutes.
## Start the gateway
```bash
hermes gateway start --platform photon
```
You'll see something like:
```
[photon] connected — webhook at 0.0.0.0:8788/photon/webhook, sidecar on 127.0.0.1:8789
```
Send an iMessage to your assigned number and Hermes will reply.
## Status & troubleshooting
```bash
hermes photon status
```
Prints:
```
Photon iMessage status
──────────────────────
device token : ✓ stored
project id : 3c90c3cc-0d44-4b50-...
project secret : ✓ stored
node binary : /usr/bin/node
sidecar deps : ✓ installed
webhook secret : ✓ set
```
Common issues:
- **`sidecar deps : ✗ run hermes photon install-sidecar`** — Node is
installed but `spectrum-ts` isn't. Run the suggested command.
- **`webhook secret : ⚠ unset — verification disabled`** — the
plugin will accept ANY POST to the webhook URL, which is unsafe.
Re-run `hermes photon webhook register` and store the secret.
- **`PHOTON_WEBHOOK_PORT` already in use** — set a different port via
`~/.hermes/.env`.
- **Webhook reachable from localhost but Photon can't deliver**
Photon needs a public hostname. Cloudflare Tunnel is the easiest
free option.
## Webhook management
```bash
hermes photon webhook list # show registered hooks
hermes photon webhook delete <webhook-id> # remove one
```
## Limits today
- **Attachments are metadata-only.** Inbound webhooks carry the
filename + MIME type but no download URL — Photon documents an
attachment retrieval endpoint as roadmap.
- **Outbound attachments not wired yet.** Easy to add in the sidecar
once the agent has reason to send them.
- **Photon's free quotas:** 5,000 messages per server per day,
50 new-conversation initiations per shared line per day. Increases
available — email `help@photon.codes`.
## Env vars
| Variable | Default | Notes |
|---------------------------|--------------------|--------------------------------------------|
| `PHOTON_PROJECT_ID` | from `auth.json` | Set by `hermes photon setup` |
| `PHOTON_PROJECT_SECRET` | from `auth.json` | Set by `hermes photon setup` |
| `PHOTON_WEBHOOK_SECRET` | (unset) | From `hermes photon webhook register` |
| `PHOTON_WEBHOOK_PORT` | `8788` | Local port for the aiohttp listener |
| `PHOTON_WEBHOOK_PATH` | `/photon/webhook` | Path under which the listener mounts |
| `PHOTON_WEBHOOK_BIND` | `0.0.0.0` | Bind address for the listener |
| `PHOTON_SIDECAR_PORT` | `8789` | Loopback port for sidecar control |
| `PHOTON_SIDECAR_AUTOSTART`| `true` | Whether the adapter spawns the sidecar |
| `PHOTON_NODE_BIN` | `which node` | Override the Node binary path |
| `PHOTON_HOME_CHANNEL` | (unset) | Default space ID for cron / notifications |
| `PHOTON_ALLOWED_USERS` | (unset) | Comma-separated E.164 allowlist |
| `PHOTON_ALLOW_ALL_USERS` | `false` | Dev only — accept any sender |
[photon]: https://photon.codes/
[app]: https://app.photon.codes/

View file

@ -647,6 +647,7 @@ const sidebars: SidebarsConfig = {
'user-guide/messaging/mattermost',
'user-guide/messaging/matrix',
'user-guide/messaging/bluebubbles',
'user-guide/messaging/photon',
'user-guide/messaging/google_chat',
'user-guide/messaging/line',
'user-guide/messaging/simplex',