diff --git a/gateway/platforms/ADDING_A_PLATFORM.md b/gateway/platforms/ADDING_A_PLATFORM.md index 80ebd27c5da..ffe67e046b1 100644 --- a/gateway/platforms/ADDING_A_PLATFORM.md +++ b/gateway/platforms/ADDING_A_PLATFORM.md @@ -33,6 +33,17 @@ status display, gateway setup, and more. auto-populate `OPTIONAL_ENV_VARS` in `hermes_cli/config.py` so the setup wizard surfaces proper descriptions, prompts, password flags, and URLs. +**Subclassing for platform-specific UX.** When a platform has a hard +time-window constraint that the base adapter can't anticipate (LINE's +60s single-use reply token, WhatsApp's 24h session window, etc.), an +adapter can override `_keep_typing` to layer a mid-flight bubble at a +threshold without expanding the kwarg surface. Always +`await super()._keep_typing(...)` so the typing heartbeat keeps running, +and tear down your side task in `finally`. See `plugins/platforms/line/` +for the full pattern (Template Buttons postback at 45s, `RequestCache` +state machine, `interrupt_session_activity` override for `/stop` +orphans) and the developer-guide page for the prose walkthrough. + See `plugins/platforms/irc/`, `plugins/platforms/teams/`, and `plugins/platforms/google_chat/` for complete working examples, and `website/docs/developer-guide/adding-platform-adapters.md` for the full diff --git a/plugins/platforms/line/__init__.py b/plugins/platforms/line/__init__.py new file mode 100644 index 00000000000..d4f1d7bf0e3 --- /dev/null +++ b/plugins/platforms/line/__init__.py @@ -0,0 +1,3 @@ +from .adapter import register + +__all__ = ["register"] diff --git a/plugins/platforms/line/adapter.py b/plugins/platforms/line/adapter.py new file mode 100644 index 00000000000..67582ffae8d --- /dev/null +++ b/plugins/platforms/line/adapter.py @@ -0,0 +1,1638 @@ +""" +LINE Messaging API platform adapter for Hermes Agent. + +A bundled platform plugin that runs an aiohttp webhook server, accepts LINE +webhook events (signature-verified), and relays messages to/from the agent +via the standard ``BasePlatformAdapter`` interface. + +Design highlights +----------------- + +**Reply token preferred, Push fallback.** LINE's reply token is single-use +and expires roughly 60 seconds after the inbound event. We try Reply first +(it's free) and fall back to the metered Push API when the token is absent, +expired, or rejected by the API. + +**Slow-LLM postback button (optional).** When the LLM is still running past +``slow_response_threshold`` seconds (default 45, leaving 15s margin on the +60s reply-token TTL), we burn the original reply token to send a Template +Buttons bubble — the user taps it later to receive the cached answer via a +*fresh* reply token (also free). State machine: PENDING → READY → DELIVERED, +with ERROR for cancelled runs. Set the threshold to 0 to disable the +button and always Push-fallback instead. + +**Three-allowlist gating.** Separate allowlists for users (U-prefixed), +groups (C-prefixed), and rooms (R-prefixed). ``LINE_ALLOW_ALL_USERS=true`` +is a dev-only escape hatch. + +**Media via public HTTPS.** LINE's Messaging API does *not* accept +binary uploads — images, audio, and video must be reachable HTTPS URLs. +We register registered tempfiles under ``/line/media//`` +served by the same aiohttp app, with an allowed-roots traversal guard. +``LINE_PUBLIC_URL`` (e.g. ``https://my-tunnel.example.com``) overrides +the host:port construction so URLs are reachable when bind is 0.0.0.0 +or behind a reverse proxy. + +**5-message batching.** LINE accepts at most 5 message objects per +Reply/Push call; longer responses are smart-chunked at 4500 chars +(LINE per-bubble limit is 5000) and batched. + +Synthesis credits +----------------- + +This file is a synthesis of seven open community PRs adding LINE support +to Hermes Agent. It deliberately ports the *strongest* idea from each into +a single plugin-form module that requires zero core edits: + +* PR #18153 (leepoweii) — Template Buttons postback cache state machine, + Markdown URL preservation, system-message bypass. +* PR #8398 (yuga-hashimoto) — media URL serving with traversal guard, + send_voice / send_video, ``LINE_PUBLIC_URL`` env, macOS ``/tmp`` root. +* PR #16832 (jethac) — config wiring style, voice/image tests. +* PR #21023 (perng) — plugin-form skeleton (the only one already + modeled on ``ADDING_A_PLATFORM.md``), reply→push fallback at 50s TTL, + loading-animation indicator, source dispatcher. +* PR #14942 (soichiyo) — Cloudflare-tunnel operating model (docs only). +* PR #14988 (David-0x221Eight) — text-first scope discipline. +* PR #6676 (liyoungc) — Push-only mode (used as the ``threshold=0`` + fallback path here). +""" + +from __future__ import annotations + +import asyncio +import base64 +import enum +import hashlib +import hmac +import json +import logging +import mimetypes +import os +import re +import secrets +import tempfile +import time +import uuid +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Awaitable, Callable, Dict, List, Optional, Set, Tuple +from urllib.parse import quote as _urlquote + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Lazy / function-level imports for gateway internals are NOT used here — +# the plugin discovery flow imports adapter.py late enough that gateway is +# already loaded. +# --------------------------------------------------------------------------- + +from gateway.platforms.base import ( + BasePlatformAdapter, + MessageEvent, + MessageType, + SendResult, + cache_image_from_bytes, +) +from gateway.config import Platform +from gateway.session import SessionSource + + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +LINE_REPLY_URL = "https://api.line.me/v2/bot/message/reply" +LINE_PUSH_URL = "https://api.line.me/v2/bot/message/push" +LINE_LOADING_URL = "https://api.line.me/v2/bot/chat/loading/start" +LINE_CONTENT_URL_FMT = "https://api-data.line.me/v2/bot/message/{message_id}/content" +LINE_BOT_INFO_URL = "https://api.line.me/v2/bot/info" + +# LINE Messaging API hard limits +LINE_PER_BUBBLE_CHARS = 5000 # Hard limit per text message object +LINE_SAFE_BUBBLE_CHARS = 4500 # Conservative limit for chunking +LINE_MAX_MESSAGES_PER_CALL = 5 # API rejects >5 messages per Reply/Push +LINE_REPLY_TOKEN_TTL_SECONDS = 50 # Conservative cap below LINE's ~60s + +# Webhook hardening +WEBHOOK_BODY_MAX_BYTES = 1_048_576 # 1 MiB — webhooks are tiny JSON +DEFAULT_WEBHOOK_PORT = 8646 +DEFAULT_WEBHOOK_PATH = "/line/webhook" +DEFAULT_MEDIA_PATH_PREFIX = "/line/media" + +# Slow-LLM postback button defaults +DEFAULT_SLOW_RESPONSE_THRESHOLD = 45.0 # seconds; 0 disables +DEFAULT_PENDING_REPLY_TEXT = ( + "🤔 Still thinking. Tap below to fetch the answer when it's ready." +) +DEFAULT_BUTTON_LABEL = "Get answer" +DEFAULT_DELIVERED_TEXT = "Already replied ✅" +DEFAULT_INTERRUPTED_TEXT = "Run was interrupted before completion." + +# Media defaults +MEDIA_TOKEN_TTL_SECONDS = 1800 # 30 minutes; LINE caches the URL aggressively +LINE_IMAGE_MAX_BYTES = 10 * 1024 * 1024 # 10 MB per LINE docs +LINE_AV_MAX_BYTES = 200 * 1024 * 1024 # 200 MB for voice/video + +# A 1×1 transparent PNG used as fallback video preview thumbnail when no +# explicit preview is supplied — LINE requires ``previewImageUrl`` for +# video messages. Sourced from the Python stdlib (no Pillow dependency). +_FALLBACK_PNG_PREVIEW = bytes.fromhex( + "89504e470d0a1a0a0000000d49484452000000010000000108060000001f15c4" + "890000000d49444154789c63000100000005000100377a7ff20000000049454e" + "44ae426082" +) + + +# --------------------------------------------------------------------------- +# Markdown stripping (URL-preserving) +# --------------------------------------------------------------------------- + +_MD_LINK_RE = re.compile(r"\[([^\]]+)\]\((https?://[^\s)]+)\)") +_MD_BOLD_RE = re.compile(r"\*\*(.+?)\*\*") +_MD_ITAL_RE = re.compile(r"(? str: + """Strip Markdown that LINE can't render, but keep URLs usable. + + LINE's text bubble has zero Markdown support — bold, italics, code + fences, headings, and bullet markers all render as literal characters. + URLs *are* auto-linked by the client, but only when they appear bare + (not inside ``[label](url)`` syntax). This converts ``[label](url)`` + to ``label (url)`` so the URL remains tappable, then strips the rest. + + Source: PR #18153 (leepoweii) — adapted to keep code-block content + visible (LINE users frequently want command snippets to land as + plain text, not be eaten by the fence). + """ + if not text: + return text + + # Code blocks first — keep the inner content, drop the fences. + def _unfence(m: re.Match) -> str: + return m.group(1).rstrip("\n") + text = _MD_CODE_BLOCK_RE.sub(_unfence, text) + + # Inline code: keep content, drop backticks. + text = _MD_CODE_INLINE_RE.sub(r"\1", text) + + # Markdown links → "label (url)" + text = _MD_LINK_RE.sub(lambda m: f"{m.group(1)} ({m.group(2)})", text) + + # Bold/italic markers — strip. + text = _MD_BOLD_RE.sub(r"\1", text) + text = _MD_ITAL_RE.sub(r"\1", text) + + # Headings (#, ##) and bullet markers — strip the prefix only. + text = _MD_HEADING_RE.sub("", text) + text = _MD_BULLET_RE.sub("• ", text) + + return text + + +def split_for_line(text: str, max_chars: int = LINE_SAFE_BUBBLE_CHARS) -> List[str]: + """Split ``text`` into LINE-sized bubbles, preferring paragraph/line breaks. + + Returns at most ``LINE_MAX_MESSAGES_PER_CALL`` chunks; longer text is + truncated with an ellipsis on the final chunk to keep the response + deliverable in a single Reply/Push call. + """ + if not text: + return [] + if len(text) <= max_chars: + return [text] + + chunks: List[str] = [] + remaining = text + while remaining and len(chunks) < LINE_MAX_MESSAGES_PER_CALL: + if len(remaining) <= max_chars: + chunks.append(remaining) + remaining = "" + break + # Try to break on the latest paragraph or newline within budget. + cut = remaining.rfind("\n\n", 0, max_chars) + if cut < int(max_chars * 0.5): + cut = remaining.rfind("\n", 0, max_chars) + if cut < int(max_chars * 0.5): + cut = remaining.rfind(" ", 0, max_chars) + if cut <= 0: + cut = max_chars + chunks.append(remaining[:cut].rstrip()) + remaining = remaining[cut:].lstrip() + + if remaining: + # Truncate gracefully — caller already burned its 5-bubble budget. + if chunks: + tail = chunks[-1] + if len(tail) > max_chars - 1: + tail = tail[: max_chars - 1] + chunks[-1] = tail.rstrip() + "…" + else: + chunks.append(remaining[: max_chars - 1] + "…") + return chunks + + +# --------------------------------------------------------------------------- +# Webhook signature verification +# --------------------------------------------------------------------------- + +def verify_line_signature(body: bytes, signature: str, channel_secret: str) -> bool: + """Verify a LINE webhook's ``X-Line-Signature`` header. + + LINE signs the *raw* request body with HMAC-SHA256 keyed by the + channel secret, then base64-encodes the digest. Constant-time + comparison defends against timing oracles. + """ + if not signature or not channel_secret or body is None: + return False + try: + digest = hmac.new( + channel_secret.encode("utf-8"), + body, + hashlib.sha256, + ).digest() + expected = base64.b64encode(digest).decode("utf-8") + except Exception: + return False + return hmac.compare_digest(expected, signature) + + +# --------------------------------------------------------------------------- +# Cache state machine — slow-LLM postback flow +# --------------------------------------------------------------------------- + +class State(enum.Enum): + PENDING = "pending" # button sent, LLM still running + READY = "ready" # LLM done, response cached, waiting for postback tap + DELIVERED = "delivered" + ERROR = "error" # LLM raised / interrupted; cached error text waiting + + +@dataclass +class _CacheEntry: + state: State + payload: Any = None + chat_id: str = "" + created_at: float = field(default_factory=time.time) + updated_at: float = field(default_factory=time.time) + + +class RequestCache: + """In-memory cache for slow-LLM postback retrieval. + + PRs #18153 originally combined two TTLs — one for PENDING (24h) and + a shorter one for READY/DELIVERED/ERROR (1h). We keep the same model + here. + """ + + def __init__( + self, + ttl_seconds: int = 3600, + pending_ttl_seconds: int = 86400, + ) -> None: + self._entries: Dict[str, _CacheEntry] = {} + self._ttl = ttl_seconds + self._pending_ttl = pending_ttl_seconds + + def register_pending(self, chat_id: str) -> str: + rid = str(uuid.uuid4()) + self._entries[rid] = _CacheEntry(state=State.PENDING, chat_id=chat_id) + return rid + + def get(self, request_id: str) -> Optional[_CacheEntry]: + return self._entries.get(request_id) + + def set_ready(self, request_id: str, payload: Any) -> None: + entry = self._entries.get(request_id) + if entry is None or entry.state is not State.PENDING: + return + entry.state = State.READY + entry.payload = payload + entry.updated_at = time.time() + + def set_error(self, request_id: str, message: str) -> None: + entry = self._entries.get(request_id) + if entry is None or entry.state is not State.PENDING: + return + entry.state = State.ERROR + entry.payload = message + entry.updated_at = time.time() + + def mark_delivered(self, request_id: str) -> None: + entry = self._entries.get(request_id) + if entry is None or entry.state not in (State.READY, State.ERROR): + return + entry.state = State.DELIVERED + entry.updated_at = time.time() + + def find_pending_for_chat(self, chat_id: str) -> Optional[str]: + for rid, entry in self._entries.items(): + if entry.state is State.PENDING and entry.chat_id == chat_id: + return rid + return None + + def prune(self) -> int: + now = time.time() + removed = 0 + for rid in list(self._entries.keys()): + entry = self._entries[rid] + if entry.state is State.PENDING: + if now - entry.created_at > self._pending_ttl: + del self._entries[rid] + removed += 1 + else: + if now - entry.updated_at > self._ttl: + del self._entries[rid] + removed += 1 + return removed + + +# --------------------------------------------------------------------------- +# Inbound dedup +# --------------------------------------------------------------------------- + +class _MessageDeduplicator: + """Bounded LRU of LINE webhook event IDs to ignore at-least-once retries.""" + + def __init__(self, max_size: int = 1000) -> None: + self._seen: Dict[str, float] = {} + self._max = max_size + + def is_duplicate(self, event_id: str) -> bool: + if not event_id: + return False + if event_id in self._seen: + return True + if len(self._seen) >= self._max: + # Drop the oldest 10% so we don't trim on every insert. + cutoff = sorted(self._seen.values())[len(self._seen) // 10 or 1] + self._seen = {k: v for k, v in self._seen.items() if v > cutoff} + self._seen[event_id] = time.time() + return False + + +# --------------------------------------------------------------------------- +# Source / chat-id resolution +# --------------------------------------------------------------------------- + +def _resolve_chat(source: Dict[str, Any]) -> Tuple[str, str]: + """Return ``(chat_id, chat_type)`` from a LINE event ``source`` block. + + LINE sources are one of: + * ``{"type": "user", "userId": "U..."}`` → 1:1 DM + * ``{"type": "group", "groupId": "C...", "userId": "U..."}`` → group chat + * ``{"type": "room", "roomId": "R...", "userId": "U..."}`` → multi-user room + + Source: PR #21023 (perng), unchanged. + """ + src_type = (source or {}).get("type", "") + if src_type == "group": + return source.get("groupId", ""), "group" + if src_type == "room": + return source.get("roomId", ""), "room" + if src_type == "user": + return source.get("userId", ""), "dm" + return "", "dm" + + +def _allowed_for_source( + source: Dict[str, Any], + *, + allow_all: bool, + user_ids: Set[str], + group_ids: Set[str], + room_ids: Set[str], +) -> bool: + """Three-list gate — credit PR #18153.""" + if allow_all: + return True + src_type = (source or {}).get("type", "") + if src_type == "user": + uid = source.get("userId", "") + return bool(uid) and uid in user_ids + if src_type == "group": + gid = source.get("groupId", "") + return bool(gid) and gid in group_ids + if src_type == "room": + rid = source.get("roomId", "") + return bool(rid) and rid in room_ids + return False + + +# --------------------------------------------------------------------------- +# LINE Reply / Push HTTP client +# --------------------------------------------------------------------------- + +class _LineClient: + """Thin async wrapper around the LINE Messaging API. + + We use ``aiohttp`` directly to avoid a ``line-bot-sdk`` dependency + (the SDK pulls in its own httpx pin and the ergonomic gain is small + for the four endpoints we actually call). + """ + + def __init__(self, channel_access_token: str, *, timeout: float = 15.0) -> None: + self._token = channel_access_token + self._timeout = timeout + self._headers = { + "Authorization": f"Bearer {channel_access_token}", + "Content-Type": "application/json", + } + + async def reply(self, reply_token: str, messages: List[Dict[str, Any]]) -> None: + import aiohttp + timeout = aiohttp.ClientTimeout(total=self._timeout) + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.post( + LINE_REPLY_URL, + headers=self._headers, + json={"replyToken": reply_token, "messages": messages}, + ) as resp: + if resp.status >= 400: + body = await resp.text() + raise RuntimeError(f"LINE reply {resp.status}: {body[:200]}") + + async def push(self, chat_id: str, messages: List[Dict[str, Any]]) -> None: + import aiohttp + timeout = aiohttp.ClientTimeout(total=self._timeout) + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.post( + LINE_PUSH_URL, + headers=self._headers, + json={"to": chat_id, "messages": messages}, + ) as resp: + if resp.status >= 400: + body = await resp.text() + raise RuntimeError(f"LINE push {resp.status}: {body[:200]}") + + async def loading(self, chat_id: str, seconds: int = 60) -> None: + """Loading indicator (DM only). LINE rejects this for groups/rooms.""" + if not chat_id or not chat_id.startswith("U"): + return + import aiohttp + # LINE caps loadingSeconds in 5-step increments, max 60. + clamped = max(5, min(60, (seconds // 5) * 5 or 5)) + try: + timeout = aiohttp.ClientTimeout(total=5.0) + async with aiohttp.ClientSession(timeout=timeout) as session: + await session.post( + LINE_LOADING_URL, + headers=self._headers, + json={"chatId": chat_id, "loadingSeconds": clamped}, + ) + except Exception as exc: # best-effort; never raise + logger.debug("LINE loading indicator failed: %s", exc) + + async def fetch_content(self, message_id: str) -> bytes: + """Download an inbound media message's binary content.""" + import aiohttp + url = LINE_CONTENT_URL_FMT.format(message_id=message_id) + timeout = aiohttp.ClientTimeout(total=30.0) + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.get(url, headers={"Authorization": f"Bearer {self._token}"}) as resp: + if resp.status >= 400: + raise RuntimeError(f"LINE content {resp.status}") + return await resp.read() + + async def get_bot_user_id(self) -> Optional[str]: + """Fetch this channel's own userId so we can filter self-messages.""" + import aiohttp + timeout = aiohttp.ClientTimeout(total=10.0) + try: + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.get(LINE_BOT_INFO_URL, headers=self._headers) as resp: + if resp.status >= 400: + return None + data = await resp.json() + return data.get("userId") + except Exception: + return None + + +# --------------------------------------------------------------------------- +# Message builders +# --------------------------------------------------------------------------- + +def _text_message(text: str) -> Dict[str, Any]: + """Build a LINE text message object, capped to per-bubble max.""" + if len(text) > LINE_PER_BUBBLE_CHARS: + text = text[: LINE_PER_BUBBLE_CHARS - 1] + "…" + return {"type": "text", "text": text} + + +def _image_message(original_url: str, preview_url: Optional[str] = None) -> Dict[str, Any]: + return { + "type": "image", + "originalContentUrl": original_url, + "previewImageUrl": preview_url or original_url, + } + + +def _audio_message(url: str, duration_ms: int = 1000) -> Dict[str, Any]: + return { + "type": "audio", + "originalContentUrl": url, + "duration": int(duration_ms), + } + + +def _video_message(url: str, preview_url: str) -> Dict[str, Any]: + return { + "type": "video", + "originalContentUrl": url, + "previewImageUrl": preview_url, + } + + +def build_postback_button_message( + text: str, button_label: str, request_id: str +) -> Dict[str, Any]: + """Template Buttons message — the slow-LLM postback bubble. + + From PR #18153 (leepoweii). Template Buttons stay tappable from chat + history, unlike Quick Reply chips which are dismissed the moment any + new message arrives in the chat. + + LINE limits: ``text`` ≤ 160 chars, ``altText`` ≤ 400 chars. + """ + truncated = text if len(text) <= 160 else text[:157] + "..." + alt = text if len(text) <= 400 else text[:397] + "..." + return { + "type": "template", + "altText": alt, + "template": { + "type": "buttons", + "text": truncated, + "actions": [ + { + "type": "postback", + "label": button_label[:20] or "Get answer", + "data": json.dumps( + {"action": "show_response", "request_id": request_id} + ), + "displayText": button_label[:300] or "Get answer", + } + ], + }, + } + + +# Prefixes the gateway uses for system busy-acks (interrupting / queued / +# steered). When the postback cache has a PENDING entry we *bypass* the +# cache for these so they reach the user as visible bubbles instead of +# being silently swallowed. From PR #18153. +_SYSTEM_BYPASS_PREFIXES: Tuple[str, ...] = ( + "⚡ Interrupting", + "⏳ Queued", + "⏩ Steered", + "💾", # background-review summary +) + + +def _is_system_bypass(content: str) -> bool: + if not content: + return False + return any(content.startswith(p) for p in _SYSTEM_BYPASS_PREFIXES) + + +# --------------------------------------------------------------------------- +# Configuration helpers +# --------------------------------------------------------------------------- + +def _csv_set(value: str) -> Set[str]: + if not value: + return set() + return {x.strip() for x in value.split(",") if x.strip()} + + +def _truthy_env(name: str, default: bool = False) -> bool: + v = os.getenv(name) + if v is None: + return default + return v.strip().lower() in ("1", "true", "yes", "on") + + +# --------------------------------------------------------------------------- +# Adapter +# --------------------------------------------------------------------------- + +class LineAdapter(BasePlatformAdapter): + """LINE Messaging API gateway adapter.""" + + # LINE has its own message-edit story (none) — we always send fresh + # bubbles, never edit, so REQUIRES_EDIT_FINALIZE stays False. + + def __init__(self, config, **kwargs): + platform = Platform("line") + super().__init__(config=config, platform=platform) + + extra = getattr(config, "extra", {}) or {} + + # Credentials + self.channel_access_token = ( + os.getenv("LINE_CHANNEL_ACCESS_TOKEN") + or extra.get("channel_access_token", "") + ) + self.channel_secret = ( + os.getenv("LINE_CHANNEL_SECRET") + or extra.get("channel_secret", "") + ) + + # Webhook server + self.webhook_host = os.getenv("LINE_HOST") or extra.get("host", "0.0.0.0") + try: + self.webhook_port = int( + os.getenv("LINE_PORT") or extra.get("port", DEFAULT_WEBHOOK_PORT) + ) + except (TypeError, ValueError): + self.webhook_port = DEFAULT_WEBHOOK_PORT + self.webhook_path = extra.get("webhook_path", DEFAULT_WEBHOOK_PATH) + + # Public base URL — required for media sending when bind isn't + # publicly reachable. + self.public_base_url = ( + os.getenv("LINE_PUBLIC_URL") + or extra.get("public_url", "") + or "" + ).rstrip("/") + + # Three-allowlist gating + self.allow_all = _truthy_env( + "LINE_ALLOW_ALL_USERS", bool(extra.get("allow_all_users", False)) + ) + self.allowed_users = _csv_set( + os.getenv("LINE_ALLOWED_USERS", "") + ) | set(extra.get("allowed_users", [])) + self.allowed_groups = _csv_set( + os.getenv("LINE_ALLOWED_GROUPS", "") + ) | set(extra.get("allowed_groups", [])) + self.allowed_rooms = _csv_set( + os.getenv("LINE_ALLOWED_ROOMS", "") + ) | set(extra.get("allowed_rooms", [])) + + # Slow-LLM postback button threshold + try: + self.slow_response_threshold = float( + os.getenv("LINE_SLOW_RESPONSE_THRESHOLD") + or extra.get("slow_response_threshold", DEFAULT_SLOW_RESPONSE_THRESHOLD) + ) + except (TypeError, ValueError): + self.slow_response_threshold = DEFAULT_SLOW_RESPONSE_THRESHOLD + + # User-overridable copy + self.pending_text = ( + os.getenv("LINE_PENDING_TEXT") + or extra.get("pending_text", DEFAULT_PENDING_REPLY_TEXT) + ) + self.button_label = ( + os.getenv("LINE_BUTTON_LABEL") + or extra.get("button_label", DEFAULT_BUTTON_LABEL) + ) + self.delivered_text = ( + os.getenv("LINE_DELIVERED_TEXT") + or extra.get("delivered_text", DEFAULT_DELIVERED_TEXT) + ) + self.interrupted_text = ( + os.getenv("LINE_INTERRUPTED_TEXT") + or extra.get("interrupted_text", DEFAULT_INTERRUPTED_TEXT) + ) + + # Runtime state + self._client: Optional[_LineClient] = None + self._app = None # aiohttp.web.Application + self._runner = None # aiohttp.web.AppRunner + self._site = None # aiohttp.web.TCPSite + self._reply_tokens: Dict[str, Tuple[str, float]] = {} # chat_id → (token, expiry) + self._cache = RequestCache() + self._dedup = _MessageDeduplicator() + self._bot_user_id: Optional[str] = None + self._lock_key: Optional[str] = None + + # Media state + self._media_tokens: Dict[str, Tuple[str, float]] = {} # token → (path, expiry) + self._media_temp_paths: Set[str] = set() + self._media_ttl = MEDIA_TOKEN_TTL_SECONDS + + # Pending-button slot per chat — ensures one outstanding postback + # button per chat at a time. Postback cache request_id keyed by chat_id. + self._pending_buttons: Dict[str, str] = {} + + # ------------------------------------------------------------------ + # Connection lifecycle + # ------------------------------------------------------------------ + + async def connect(self) -> bool: + if not self.channel_access_token or not self.channel_secret: + self._set_fatal_error( + "config_missing", + "LINE_CHANNEL_ACCESS_TOKEN and LINE_CHANNEL_SECRET must be set", + retryable=False, + ) + return False + + # Prevent two profiles from running on the same channel access token. + try: + from gateway.status import acquire_scoped_lock + # Use a hash of the token so we don't write the secret to disk. + tok_hash = hashlib.sha256(self.channel_access_token.encode()).hexdigest()[:16] + if not acquire_scoped_lock("line", tok_hash): + self._set_fatal_error( + "lock_conflict", + "LINE channel already in use by another profile", + retryable=False, + ) + return False + self._lock_key = tok_hash + except ImportError: + self._lock_key = None + + self._client = _LineClient(self.channel_access_token) + + # Best-effort: fetch our own bot userId for self-message filtering. + # If the call fails (offline tests, transient 5xx) we fall back to + # not filtering self-events; the cost is minor (LINE doesn't + # actually echo our own messages back). + try: + self._bot_user_id = await self._client.get_bot_user_id() + except Exception as exc: + logger.debug("LINE: get_bot_user_id failed: %s", exc) + self._bot_user_id = None + + # Spin up the aiohttp webhook server. + try: + from aiohttp import web + except ImportError: + self._set_fatal_error( + "missing_dep", + "aiohttp is required for the LINE adapter — install with `pip install aiohttp`", + retryable=False, + ) + return False + + self._app = web.Application(client_max_size=WEBHOOK_BODY_MAX_BYTES) + self._app.router.add_post(self.webhook_path, self._handle_webhook) + # Public health probe — useful for tunnel/proxy verification. + self._app.router.add_get(f"{self.webhook_path}/health", self._handle_health) + # Media serving endpoint. + self._app.router.add_get( + f"{DEFAULT_MEDIA_PATH_PREFIX}/{{token}}/{{filename}}", + self._handle_media, + ) + + self._runner = web.AppRunner(self._app) + try: + await self._runner.setup() + self._site = web.TCPSite(self._runner, self.webhook_host, self.webhook_port) + await self._site.start() + except OSError as exc: + self._set_fatal_error( + "bind_failed", + f"Could not bind LINE webhook on {self.webhook_host}:{self.webhook_port}: {exc}", + retryable=True, + ) + return False + + self._mark_connected() + logger.info( + "LINE: webhook listening on %s:%s%s%s", + self.webhook_host, + self.webhook_port, + self.webhook_path, + f" (public: {self.public_base_url})" if self.public_base_url else "", + ) + return True + + async def disconnect(self) -> None: + self._mark_disconnected() + + if self._site is not None: + try: + await self._site.stop() + except Exception: + pass + self._site = None + if self._runner is not None: + try: + await self._runner.cleanup() + except Exception: + pass + self._runner = None + self._app = None + + # Cleanup any tracked tempfiles. + for path in list(self._media_temp_paths): + try: + os.unlink(path) + except OSError: + pass + self._media_temp_paths.clear() + self._media_tokens.clear() + + if self._lock_key: + try: + from gateway.status import release_scoped_lock + release_scoped_lock("line", self._lock_key) + except Exception: + pass + self._lock_key = None + + # ------------------------------------------------------------------ + # Webhook handlers + # ------------------------------------------------------------------ + + async def _handle_health(self, request) -> Any: + from aiohttp import web + return web.json_response({"status": "ok", "platform": "line"}) + + async def _handle_webhook(self, request) -> Any: + from aiohttp import web + + # Body cap defends against memory-exhaustion via crafted Content-Length + # (aiohttp's client_max_size only applies to certain body modes). + try: + body = await request.read() + except Exception as exc: + logger.debug("LINE: read failed: %s", exc) + return web.Response(status=400, text="bad request") + if len(body) > WEBHOOK_BODY_MAX_BYTES: + return web.Response(status=413, text="payload too large") + + signature = request.headers.get("X-Line-Signature", "") + if not verify_line_signature(body, signature, self.channel_secret): + return web.Response(status=401, text="invalid signature") + + try: + payload = json.loads(body.decode("utf-8")) + except (UnicodeDecodeError, json.JSONDecodeError): + return web.Response(status=400, text="bad json") + + events = payload.get("events", []) or [] + for event in events: + try: + await self._dispatch_event(event) + except Exception: + logger.exception("LINE: dispatch_event failed") + + return web.Response(status=200, text="ok") + + async def _dispatch_event(self, event: Dict[str, Any]) -> None: + event_type = event.get("type") + source = event.get("source") or {} + webhook_event_id = event.get("webhookEventId", "") or "" + + # Dedup retries (LINE webhooks may be re-delivered). + if webhook_event_id and self._dedup.is_duplicate(webhook_event_id): + logger.debug("LINE: ignoring duplicate webhook event %s", webhook_event_id) + return + + # Filter our own messages (self-echo). + sender_user_id = source.get("userId", "") + if self._bot_user_id and sender_user_id == self._bot_user_id: + return + + # Allowlist gate. + if not _allowed_for_source( + source, + allow_all=self.allow_all, + user_ids=self.allowed_users, + group_ids=self.allowed_groups, + room_ids=self.allowed_rooms, + ): + logger.info("LINE: rejecting unauthorized source %s", source) + return + + if event_type == "message": + await self._handle_message_event(event) + elif event_type == "postback": + await self._handle_postback_event(event) + elif event_type in ("follow", "unfollow", "join", "leave"): + logger.info("LINE: lifecycle event %s from %s", event_type, source) + else: + logger.debug("LINE: ignoring event type %r", event_type) + + async def _handle_message_event(self, event: Dict[str, Any]) -> None: + msg = event.get("message") or {} + msg_type = msg.get("type", "") + message_id = msg.get("id", "") + reply_token = event.get("replyToken", "") + source = event.get("source") or {} + chat_id, chat_type = _resolve_chat(source) + user_id = source.get("userId", "") or chat_id + + # Stash the reply token for outbound use. + if chat_id and reply_token: + self._reply_tokens[chat_id] = ( + reply_token, + time.time() + LINE_REPLY_TOKEN_TTL_SECONDS, + ) + + # Handle media inbound — fetch the binary, cache it, and surface a + # vision-tool-friendly local path on the MessageEvent. + media_urls: List[str] = [] + media_types: List[str] = [] + text = "" + + if msg_type == "text": + text = msg.get("text", "") or "" + elif msg_type in ("image", "audio", "video", "file"): + local_path = await self._download_media(message_id, msg_type) + if local_path: + media_urls.append(local_path) + media_types.append(msg_type) + text = f"[{msg_type}]" + elif msg_type == "sticker": + keywords = msg.get("keywords") or [] + text = f"[sticker: {', '.join(keywords)}]" if keywords else "[sticker]" + elif msg_type == "location": + title = msg.get("title", "") + address = msg.get("address", "") + text = f"[location: {title} {address}]".strip() + else: + text = f"[unsupported message type: {msg_type}]" + + # Best-effort typing indicator (DM only). + if chat_type == "dm" and self._client: + asyncio.create_task(self._client.loading(chat_id)) + + source_obj = self.create_source( + chat_id=chat_id, + chat_type=chat_type, + user_id=user_id, + user_name=user_id, + chat_name=chat_id, + ) + + event_obj = MessageEvent( + text=text, + message_type=MessageType.TEXT if msg_type == "text" else MessageType.IMAGE, + source=source_obj, + raw_message=event, + message_id=message_id, + media_urls=media_urls, + media_types=media_types, + ) + + await self.handle_message(event_obj) + + async def _handle_postback_event(self, event: Dict[str, Any]) -> None: + """User tapped the slow-LLM postback button — deliver cached payload.""" + postback = event.get("postback") or {} + data = postback.get("data", "") or "" + reply_token = event.get("replyToken", "") + source = event.get("source") or {} + chat_id, _ = _resolve_chat(source) + + try: + parsed = json.loads(data) + except (TypeError, json.JSONDecodeError): + return + + if parsed.get("action") != "show_response": + return + request_id = parsed.get("request_id", "") + if not request_id: + return + + entry = self._cache.get(request_id) + if not self._client or not reply_token or not entry: + return + + if entry.state is State.READY: + payload = entry.payload or "" + chunks = split_for_line(strip_markdown_preserving_urls(str(payload))) + messages = [_text_message(c) for c in chunks][:LINE_MAX_MESSAGES_PER_CALL] + try: + await self._client.reply(reply_token, messages) + self._cache.mark_delivered(request_id) + self._pending_buttons.pop(chat_id, None) + except Exception as exc: + logger.warning("LINE: postback reply failed (%s); falling back to push", exc) + try: + await self._client.push(chat_id, messages) + self._cache.mark_delivered(request_id) + self._pending_buttons.pop(chat_id, None) + except Exception as exc2: + logger.error("LINE: postback push fallback failed: %s", exc2) + elif entry.state is State.ERROR: + text = str(entry.payload or self.interrupted_text) + try: + await self._client.reply(reply_token, [_text_message(text)]) + self._cache.mark_delivered(request_id) + self._pending_buttons.pop(chat_id, None) + except Exception as exc: + logger.warning("LINE: postback ERROR reply failed: %s", exc) + elif entry.state is State.DELIVERED: + try: + await self._client.reply(reply_token, [_text_message(self.delivered_text)]) + except Exception: + pass + elif entry.state is State.PENDING: + # Still working — re-issue the wait notice. + try: + await self._client.reply(reply_token, [_text_message(self.pending_text)]) + except Exception: + pass + + async def _download_media(self, message_id: str, msg_type: str) -> Optional[str]: + if not self._client or not message_id: + return None + try: + data = await self._client.fetch_content(message_id) + except Exception as exc: + logger.warning("LINE: failed to fetch %s content for %s: %s", msg_type, message_id, exc) + return None + ext = { + "image": ".jpg", + "audio": ".m4a", + "video": ".mp4", + "file": ".bin", + }.get(msg_type, ".bin") + try: + return cache_image_from_bytes(data, ext=ext) + except Exception as exc: + logger.warning("LINE: failed to cache %s payload: %s", msg_type, exc) + return None + + # ------------------------------------------------------------------ + # Outbound send (text) + # ------------------------------------------------------------------ + + async def send( + self, + chat_id: str, + content: str, + reply_to: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> SendResult: + if not self._client: + return SendResult(success=False, error="LINE adapter not connected") + + # System busy-acks (interrupting / queued / steered) bypass the + # postback cache and route directly to LINE so they reach the user + # as visible bubbles. Source: PR #18153. + if _is_system_bypass(content): + return await self._send_text_chunks(chat_id, content, force_push=False) + + # If the chat has a PENDING postback button outstanding, route the + # response into the cache for the user to fetch via tap. + pending_rid = self._pending_buttons.get(chat_id) + if pending_rid: + self._cache.set_ready(pending_rid, content) + return SendResult(success=True, message_id=pending_rid) + + return await self._send_text_chunks(chat_id, content, force_push=False) + + async def _send_text_chunks( + self, + chat_id: str, + content: str, + *, + force_push: bool, + ) -> SendResult: + if not self._client: + return SendResult(success=False, error="LINE adapter not connected") + + chunks = split_for_line(strip_markdown_preserving_urls(content)) + if not chunks: + return SendResult(success=True, message_id=None) + messages = [_text_message(c) for c in chunks][:LINE_MAX_MESSAGES_PER_CALL] + + token, used_reply = self._consume_reply_token(chat_id) + if used_reply and not force_push: + try: + await self._client.reply(token, messages) + return SendResult(success=True, message_id=token) + except Exception as exc: + logger.info("LINE: reply token rejected (%s); falling back to push", exc) + # fall through to push + + try: + await self._client.push(chat_id, messages) + return SendResult(success=True, message_id=None) + except Exception as exc: + logger.error("LINE: push send failed: %s", exc) + return SendResult(success=False, error=str(exc)) + + def _consume_reply_token(self, chat_id: str) -> Tuple[str, bool]: + """Consume a stashed reply token if present and unexpired. + + Returns ``(token, used_reply)``. + """ + entry = self._reply_tokens.pop(chat_id, None) + if not entry: + return "", False + token, expires_at = entry + if not token or time.time() >= expires_at: + return "", False + return token, True + + async def send_typing(self, chat_id: str, metadata=None) -> None: + """Trigger LINE's loading-animation indicator (DM only).""" + if self._client and chat_id: + await self._client.loading(chat_id) + + async def get_chat_info(self, chat_id: str) -> Dict[str, Any]: + """Best-effort chat info derived from the chat_id prefix. + + LINE's chat-info APIs are limited and per-source-type — instead of + chasing them we infer from the well-known ID prefixes: + ``U`` = user (1:1), ``C`` = group, ``R`` = room. The agent only + needs ``name`` + ``type`` from this method. + """ + prefix = (chat_id or "")[:1] + chat_type = {"U": "dm", "C": "group", "R": "channel"}.get(prefix, "dm") + return {"name": chat_id or "", "type": chat_type} + + def format_message(self, content: str) -> str: + """Strip Markdown that LINE can't render. URLs are preserved.""" + return strip_markdown_preserving_urls(content) + + # ------------------------------------------------------------------ + # Slow-LLM postback button — driven by _keep_typing + # ------------------------------------------------------------------ + + async def _keep_typing(self, chat_id: str, *args, **kwargs) -> None: + """Override the base loop to fire the postback button at threshold. + + We intentionally keep the base implementation behind us: it's + responsible for the typing-indicator heartbeat, while *this* + wrapper layers in the slow-LLM postback bubble at threshold. + """ + if ( + self.slow_response_threshold <= 0 + or not self._client + or not chat_id + ): + await super()._keep_typing(chat_id, *args, **kwargs) + return + + async def _fire_postback() -> None: + try: + await asyncio.sleep(self.slow_response_threshold) + except asyncio.CancelledError: + raise + # Only fire if we still have a usable reply token. If the agent + # already responded, _consume_reply_token has cleared it. + if chat_id not in self._reply_tokens: + return + if chat_id in self._pending_buttons: + return + rid = self._cache.register_pending(chat_id) + self._pending_buttons[chat_id] = rid + token, used = self._consume_reply_token(chat_id) + if not used: + self._pending_buttons.pop(chat_id, None) + return + msg = build_postback_button_message( + self.pending_text, self.button_label, rid + ) + try: + await self._client.reply(token, [msg]) + logger.info("LINE: sent slow-LLM postback button for chat %s (rid=%s)", chat_id, rid) + except Exception as exc: + logger.warning("LINE: postback button send failed: %s", exc) + self._pending_buttons.pop(chat_id, None) + + post_task = asyncio.create_task(_fire_postback()) + try: + await super()._keep_typing(chat_id, *args, **kwargs) + finally: + if not post_task.done(): + post_task.cancel() + try: + await post_task + except (asyncio.CancelledError, Exception): + pass + + async def interrupt_session_activity(self, session_key: str, chat_id: str) -> None: + """Resolve any orphan PENDING postback so the button doesn't loop.""" + await super().interrupt_session_activity(session_key, chat_id) + rid = self._pending_buttons.pop(chat_id, None) + if rid: + self._cache.set_error(rid, self.interrupted_text) + + # ------------------------------------------------------------------ + # Outbound media (image / voice / video) + # ------------------------------------------------------------------ + + def _register_media(self, file_path: str, *, cleanup: bool = False) -> str: + """Register a local file for HTTPS serving; return the URL token.""" + # Evict expired tokens first. + now = time.time() + for token in list(self._media_tokens.keys()): + path, exp = self._media_tokens[token] + if now > exp: + self._media_tokens.pop(token, None) + if path in self._media_temp_paths: + self._media_temp_paths.discard(path) + try: + os.unlink(path) + except OSError: + pass + + resolved = str(Path(file_path).resolve()) + token = secrets.token_urlsafe(32) + self._media_tokens[token] = (resolved, now + self._media_ttl) + if cleanup: + self._media_temp_paths.add(resolved) + return token + + def _media_url(self, token: str, filename: str) -> str: + """Build the public HTTPS URL for a media token. PR #8398 style.""" + if self.public_base_url: + base = self.public_base_url + else: + host = self.webhook_host + port = self.webhook_port + if port == 443: + base = f"https://{host}" + else: + base = f"https://{host}:{port}" + safe_name = _urlquote(filename, safe="") + return f"{base}{DEFAULT_MEDIA_PATH_PREFIX}/{token}/{safe_name}" + + async def _handle_media(self, request) -> Any: + """Serve a registered local file over HTTPS for LINE's media URLs. + + Defence-in-depth: even though ``_register_media`` is only called + from trusted internal code, we recheck the resolved path against + an allowed-roots set before serving. Sources allowed: + ``tempfile.gettempdir()``, ``/tmp`` (which resolves to + ``/private/tmp`` on macOS), and ``HERMES_HOME``. PR #8398. + """ + from aiohttp import web + + token = request.match_info["token"] + entry = self._media_tokens.get(token) + if not entry: + return web.Response(status=404, text="not found") + + file_path, expires_at = entry + if time.time() > expires_at: + self._media_tokens.pop(token, None) + return web.Response(status=410, text="gone") + + path = Path(file_path) + if not path.exists() or not path.is_file(): + return web.Response(status=404, text="not found") + + try: + from hermes_constants import get_hermes_home + hermes_home = Path(get_hermes_home()).resolve() + except Exception: + hermes_home = Path.home().joinpath(".hermes").resolve() + + allowed_roots = { + Path(tempfile.gettempdir()).resolve(), + Path("/tmp").resolve(), # → /private/tmp on macOS + hermes_home, + } + resolved = path.resolve() + if not any(_is_relative_to(resolved, r) for r in allowed_roots): + logger.warning("LINE: refusing to serve outside allowed roots: %s", resolved) + return web.Response(status=403, text="forbidden") + + content_type, _ = mimetypes.guess_type(str(path)) + return web.FileResponse( + path, + headers={"Content-Type": content_type or "application/octet-stream"}, + ) + + async def send_image_file( + self, + chat_id: str, + image_path: str, + caption: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> SendResult: + path = Path(image_path) + if not path.exists() or not path.is_file(): + return SendResult(success=False, error=f"image file not found: {image_path}") + if path.stat().st_size > LINE_IMAGE_MAX_BYTES: + return SendResult(success=False, error="image exceeds 10 MB LINE limit") + if not self._client: + return SendResult(success=False, error="LINE adapter not connected") + if not self.public_base_url and self.webhook_host == "0.0.0.0": + return SendResult( + success=False, + error="LINE_PUBLIC_URL must be set to send images " + "(LINE only accepts publicly reachable HTTPS URLs)", + ) + + token = self._register_media(str(path.resolve())) + url = self._media_url(token, path.name) + if not url.lower().startswith("https://"): + return SendResult(success=False, error=f"LINE image URL must be HTTPS: {url}") + msgs: List[Dict[str, Any]] = [_image_message(url)] + if caption: + msgs.append(_text_message(caption)) + return await self._send_messages(chat_id, msgs) + + async def send_voice( + self, + chat_id: str, + audio_path: str, + duration_ms: int = 1000, + metadata: Optional[Dict[str, Any]] = None, + ) -> SendResult: + path = Path(audio_path) + if not path.exists() or not path.is_file(): + return SendResult(success=False, error=f"audio file not found: {audio_path}") + if path.stat().st_size > LINE_AV_MAX_BYTES: + return SendResult(success=False, error="audio exceeds 200 MB LINE limit") + if not self._client: + return SendResult(success=False, error="LINE adapter not connected") + if not self.public_base_url and self.webhook_host == "0.0.0.0": + return SendResult( + success=False, + error="LINE_PUBLIC_URL must be set to send audio", + ) + + token = self._register_media(str(path.resolve())) + url = self._media_url(token, path.name) + return await self._send_messages(chat_id, [_audio_message(url, duration_ms)]) + + async def send_video( + self, + chat_id: str, + video_path: str, + preview_path: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> SendResult: + path = Path(video_path) + if not path.exists() or not path.is_file(): + return SendResult(success=False, error=f"video file not found: {video_path}") + if path.stat().st_size > LINE_AV_MAX_BYTES: + return SendResult(success=False, error="video exceeds 200 MB LINE limit") + if not self._client: + return SendResult(success=False, error="LINE adapter not connected") + if not self.public_base_url and self.webhook_host == "0.0.0.0": + return SendResult( + success=False, + error="LINE_PUBLIC_URL must be set to send video", + ) + + # LINE requires a previewImageUrl. Use one if supplied, otherwise + # write a stdlib 1×1 PNG to /tmp and serve it. PR #8398. + if preview_path and Path(preview_path).is_file(): + preview_token = self._register_media(str(Path(preview_path).resolve())) + preview_filename = Path(preview_path).name + else: + tmp = tempfile.NamedTemporaryFile(suffix=".png", delete=False) + try: + tmp.write(_FALLBACK_PNG_PREVIEW) + tmp.flush() + tmp.close() + preview_token = self._register_media(tmp.name, cleanup=True) + preview_filename = "preview.png" + except Exception: + try: + os.unlink(tmp.name) + except OSError: + pass + raise + + video_token = self._register_media(str(path.resolve())) + video_url = self._media_url(video_token, path.name) + preview_url = self._media_url(preview_token, preview_filename) + return await self._send_messages(chat_id, [_video_message(video_url, preview_url)]) + + async def _send_messages( + self, + chat_id: str, + messages: List[Dict[str, Any]], + ) -> SendResult: + """Send already-built message objects, batched at 5/call.""" + if not self._client: + return SendResult(success=False, error="LINE adapter not connected") + if not messages: + return SendResult(success=True, message_id=None) + + first_batch = messages[:LINE_MAX_MESSAGES_PER_CALL] + rest = messages[LINE_MAX_MESSAGES_PER_CALL:] + + # First batch: try reply token, fall back to push. + token, used_reply = self._consume_reply_token(chat_id) + if used_reply: + try: + await self._client.reply(token, first_batch) + except Exception as exc: + logger.info("LINE: reply token rejected (%s); falling back to push", exc) + try: + await self._client.push(chat_id, first_batch) + except Exception as exc2: + return SendResult(success=False, error=str(exc2)) + else: + try: + await self._client.push(chat_id, first_batch) + except Exception as exc: + return SendResult(success=False, error=str(exc)) + + # Subsequent batches: always push (reply token is single-use). + while rest: + batch = rest[:LINE_MAX_MESSAGES_PER_CALL] + rest = rest[LINE_MAX_MESSAGES_PER_CALL:] + try: + await self._client.push(chat_id, batch) + except Exception as exc: + logger.warning("LINE: push for follow-up batch failed: %s", exc) + return SendResult(success=False, error=str(exc)) + + return SendResult(success=True, message_id=None) + + +def _is_relative_to(child: Path, parent: Path) -> bool: + """Backport for Path.is_relative_to (Python 3.9+) — defensive against + cwd-resolution differences across CI runners.""" + try: + return child.resolve().is_relative_to(parent.resolve()) + except (AttributeError, ValueError): + try: + child.resolve().relative_to(parent.resolve()) + return True + except ValueError: + return False + + +# --------------------------------------------------------------------------- +# Plugin entry-point hooks +# --------------------------------------------------------------------------- + +def check_requirements() -> bool: + """Plugin gate: require credentials AND aiohttp at runtime.""" + if not os.getenv("LINE_CHANNEL_ACCESS_TOKEN"): + return False + if not os.getenv("LINE_CHANNEL_SECRET"): + return False + try: + import aiohttp # noqa: F401 + except ImportError: + return False + return True + + +def validate_config(config) -> bool: + extra = getattr(config, "extra", {}) or {} + has_token = bool( + os.getenv("LINE_CHANNEL_ACCESS_TOKEN") or extra.get("channel_access_token") + ) + has_secret = bool( + os.getenv("LINE_CHANNEL_SECRET") or extra.get("channel_secret") + ) + return has_token and has_secret + + +def is_connected(config) -> bool: + """Surface in ``hermes status`` even before the adapter is instantiated.""" + return validate_config(config) + + +def _env_enablement() -> Optional[Dict[str, Any]]: + """Auto-seed PlatformConfig.extra from env-only setups. + + Lets ``hermes status`` reflect a LINE configuration that lives entirely + in ``.env`` without a ``platforms.line`` block in ``config.yaml``. + Mirrors the IRC plugin's pattern. + """ + if not (os.getenv("LINE_CHANNEL_ACCESS_TOKEN") and os.getenv("LINE_CHANNEL_SECRET")): + return None + seeded: Dict[str, Any] = {} + if os.getenv("LINE_PORT"): + try: + seeded["port"] = int(os.environ["LINE_PORT"]) + except ValueError: + pass + if os.getenv("LINE_HOST"): + seeded["host"] = os.environ["LINE_HOST"] + if os.getenv("LINE_PUBLIC_URL"): + seeded["public_url"] = os.environ["LINE_PUBLIC_URL"] + if os.getenv("LINE_HOME_CHANNEL"): + seeded["home_channel"] = os.environ["LINE_HOME_CHANNEL"] + return seeded or {} + + +async def _standalone_send( + pconfig, + chat_id: str, + message: str, + *, + thread_id: Optional[str] = None, + media_files: Optional[List[str]] = None, + force_document: bool = False, +) -> Dict[str, Any]: + """Out-of-process push delivery for cron jobs running detached from the gateway. + + Without this hook ``deliver=line`` cron jobs fail with ``no live adapter`` + when cron runs as its own process. We always Push (reply tokens require + an inbound webhook event we don't have in this path). + + ``thread_id`` is accepted for signature parity but ignored — LINE has + no native thread primitive on the channel-side API. ``media_files`` + likewise: cron-side media delivery requires a publicly-reachable URL, + which the standalone path can't construct without binding the webhook + server, so we send a text reference instead. + """ + extra = getattr(pconfig, "extra", {}) or {} + token = ( + os.getenv("LINE_CHANNEL_ACCESS_TOKEN") + or extra.get("channel_access_token", "") + ) + if not token or not chat_id: + return {"error": "LINE standalone send: missing token or chat_id"} + + plain = strip_markdown_preserving_urls(message or "") + chunks = split_for_line(plain) or [""] + messages = [_text_message(c) for c in chunks][:LINE_MAX_MESSAGES_PER_CALL] + if media_files: + # Tack on a hint so the recipient knows media was generated but not delivered. + messages.append(_text_message(f"[{len(media_files)} attachment(s) generated; not deliverable from cron]")) + messages = messages[:LINE_MAX_MESSAGES_PER_CALL] + + client = _LineClient(token) + try: + await client.push(chat_id, messages) + return {"success": True, "message_id": None} + except Exception as exc: + return {"error": str(exc)} + + +def interactive_setup() -> None: + """Minimal stdin wizard for ``hermes setup line``. + + Mirrors the irc/teams style: prompts for the two required vars, plus + one optional public URL. Writes to ``~/.hermes/.env`` via ``hermes_cli.config``. + """ + print() + print("LINE Messaging API setup") + print("------------------------") + print("Create a Messaging API channel at https://developers.line.biz/console/") + print("then copy the values below.") + print() + + try: + from hermes_cli.config import get_env_var, set_env_var + except ImportError: + print("hermes_cli.config not available; set LINE_* vars manually in ~/.hermes/.env") + return + + def _prompt(var: str, prompt: str, *, secret: bool = False) -> None: + existing = get_env_var(var) if callable(get_env_var) else None + suffix = " [keep current]" if existing else "" + try: + if secret: + import getpass + value = getpass.getpass(f"{prompt}{suffix}: ") + else: + value = input(f"{prompt}{suffix}: ").strip() + except (EOFError, KeyboardInterrupt): + print() + return + if value: + set_env_var(var, value) + + _prompt("LINE_CHANNEL_ACCESS_TOKEN", "Channel access token", secret=True) + _prompt("LINE_CHANNEL_SECRET", "Channel secret", secret=True) + _prompt("LINE_PUBLIC_URL", "Public HTTPS base URL (optional, e.g. https://my-tunnel.example.com)") + _prompt("LINE_ALLOWED_USERS", "Allowed user IDs (comma-separated; blank=skip)") + print("Done. Set the webhook URL in the LINE console to " + "/line/webhook and enable 'Use webhook'.") + + +def register(ctx) -> None: + """Plugin entry point — called by the Hermes plugin system at startup.""" + ctx.register_platform( + name="line", + label="LINE", + adapter_factory=lambda cfg: LineAdapter(cfg), + check_fn=check_requirements, + validate_config=validate_config, + is_connected=is_connected, + required_env=["LINE_CHANNEL_ACCESS_TOKEN", "LINE_CHANNEL_SECRET"], + install_hint="pip install aiohttp", + setup_fn=interactive_setup, + env_enablement_fn=_env_enablement, + cron_deliver_env_var="LINE_HOME_CHANNEL", + standalone_sender_fn=_standalone_send, + allowed_users_env="LINE_ALLOWED_USERS", + allow_all_env="LINE_ALLOW_ALL_USERS", + # LINE per-bubble cap is 5000; smart-chunker uses 4500. + max_message_length=LINE_SAFE_BUBBLE_CHARS, + emoji="💚", + pii_safe=False, + allow_update_command=True, + platform_hint=( + "You are chatting via LINE Messaging API. LINE does NOT render " + "Markdown — text bubbles show ** and # literally. Bare URLs are " + "auto-linked, but \\[label\\](url) syntax is not. Each text bubble " + "is capped at 5000 characters and at most 5 bubbles are sent per " + "reply, so keep responses concise. Image/audio/video sending " + "requires LINE_PUBLIC_URL configured to a publicly reachable HTTPS " + "host. Slow responses surface a 'Get answer' button the user taps " + "to fetch the reply via a fresh free token." + ), + ) diff --git a/plugins/platforms/line/plugin.yaml b/plugins/platforms/line/plugin.yaml new file mode 100644 index 00000000000..f854bc4e2ea --- /dev/null +++ b/plugins/platforms/line/plugin.yaml @@ -0,0 +1,65 @@ +name: line-platform +label: LINE +kind: platform +version: 1.0.0 +description: > + LINE Messaging API gateway adapter for Hermes Agent. + Runs an aiohttp webhook server that receives LINE webhook events + (with HMAC-SHA256 signature verification) and relays messages between + LINE chats (1:1, groups, rooms) and the Hermes agent. Outbound replies + prefer the free reply token and fall back to the metered Push API + when the token has expired or is absent. Slow LLM responses surface a + Template Buttons postback bubble so the user can fetch the answer with + a fresh reply token (free) once it's ready. +author: Hermes Agent contributors +# ``requires_env`` and ``optional_env`` entries are surfaced in the +# ``hermes config`` UI via the platform-plugin env var injector in +# ``hermes_cli/config.py``. +requires_env: + - name: LINE_CHANNEL_ACCESS_TOKEN + description: "LINE channel long-lived access token (LINE Developers Console > Messaging API > Channel access token)" + prompt: "LINE channel access token" + url: "https://developers.line.biz/console/" + password: true + - name: LINE_CHANNEL_SECRET + description: "LINE channel secret (used for HMAC-SHA256 webhook signature verification)" + prompt: "LINE channel secret" + url: "https://developers.line.biz/console/" + password: true +optional_env: + - name: LINE_PORT + description: "Webhook listen port (default: 8646)" + prompt: "Webhook port" + password: false + - name: LINE_HOST + description: "Webhook bind host (default: 0.0.0.0)" + prompt: "Webhook host" + password: false + - name: LINE_PUBLIC_URL + description: "Public HTTPS base URL for serving images/audio/video to LINE (e.g. https://my-tunnel.example.com). Required for media sending when the bind address is not directly reachable." + prompt: "Public HTTPS base URL" + password: false + - name: LINE_ALLOWED_USERS + description: "Comma-separated LINE user IDs allowed to DM the bot (U-prefixed)" + prompt: "Allowed user IDs (comma-separated)" + password: false + - name: LINE_ALLOWED_GROUPS + description: "Comma-separated LINE group IDs the bot will respond in (C-prefixed)" + prompt: "Allowed group IDs (comma-separated)" + password: false + - name: LINE_ALLOWED_ROOMS + description: "Comma-separated LINE room IDs the bot will respond in (R-prefixed)" + prompt: "Allowed room IDs (comma-separated)" + password: false + - name: LINE_ALLOW_ALL_USERS + description: "Allow any LINE user to talk to the bot (dev only — disables allowlist)" + prompt: "Allow all users? (true/false)" + password: false + - name: LINE_HOME_CHANNEL + description: "Default user/group/room ID for cron / notification delivery" + prompt: "Home channel ID (or empty)" + password: false + - name: LINE_SLOW_RESPONSE_THRESHOLD + description: "Seconds before the slow-LLM postback button fires (default: 45; set 0 to disable and always Push-fallback)" + prompt: "Slow response threshold (seconds)" + password: false diff --git a/scripts/release.py b/scripts/release.py index fa4444e0d93..08502bc5d52 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -138,6 +138,14 @@ AUTHOR_MAP = { "tony@tonysimons.dev": "asimons81", "jetha@google.com": "jethac", "jani@0xhoneyjar.xyz": "deep-name", + # LINE messaging plugin (synthesis PR) + "32443648+leepoweii@users.noreply.github.com": "leepoweii", + "openclaw@liyangchen.me": "liyoungc", + "charles@perng.com": "perng", + "soichiro0111.dev@gmail.com": "soichiyo", + "0xde@pieverse.io": "David-0x221Eight", + "77736378+David-0x221Eight@users.noreply.github.com": "David-0x221Eight", + "74749461+yuga-hashimoto@users.noreply.github.com": "yuga-hashimoto", "xiangyong@zspace.cn": "CES4751", "harish.kukreja@gmail.com": "counterposition", "35294173+Fearvox@users.noreply.github.com": "Fearvox", diff --git a/tests/gateway/test_line_plugin.py b/tests/gateway/test_line_plugin.py new file mode 100644 index 00000000000..e7fd2cf9946 --- /dev/null +++ b/tests/gateway/test_line_plugin.py @@ -0,0 +1,644 @@ +"""Tests for the LINE platform adapter plugin. + +Covers the seven synthesis areas from the PR review: + +1. webhook signature verification (HMAC-SHA256, base64) + tampering rejection +2. inbound chat-id resolution for user / group / room sources +3. three-allowlist gating (users / groups / rooms / allow_all) +4. inbound dedup via webhookEventId +5. RequestCache state machine (PENDING → READY → DELIVERED, ERROR) +6. Markdown stripping with URL preservation + LINE-sized chunking +7. send routing: reply token preferred → push fallback → batched at 5/call +8. register() metadata + standalone_send shape +""" + +from __future__ import annotations + +import asyncio +import hashlib +import hmac +import base64 +import json +import os +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from tests.gateway._plugin_adapter_loader import load_plugin_adapter + +# Load plugins/platforms/line/adapter.py under plugin_adapter_line so it +# cannot collide with sibling platform-plugin tests in the same xdist worker. +_line = load_plugin_adapter("line") + +verify_line_signature = _line.verify_line_signature +strip_markdown_preserving_urls = _line.strip_markdown_preserving_urls +split_for_line = _line.split_for_line +build_postback_button_message = _line.build_postback_button_message +_resolve_chat = _line._resolve_chat +_allowed_for_source = _line._allowed_for_source +_is_system_bypass = _line._is_system_bypass +RequestCache = _line.RequestCache +State = _line.State +LineAdapter = _line.LineAdapter +register = _line.register +check_requirements = _line.check_requirements +validate_config = _line.validate_config +_standalone_send = _line._standalone_send +_env_enablement = _line._env_enablement +_MessageDeduplicator = _line._MessageDeduplicator + + +# --------------------------------------------------------------------------- +# 1. Signature verification +# --------------------------------------------------------------------------- + +class TestSignature: + + def _sign(self, body: bytes, secret: str) -> str: + digest = hmac.new(secret.encode(), body, hashlib.sha256).digest() + return base64.b64encode(digest).decode() + + def test_valid_signature_passes(self): + body = b'{"events": []}' + sig = self._sign(body, "secret") + assert verify_line_signature(body, sig, "secret") + + def test_tampered_body_rejected(self): + body = b'{"events": []}' + sig = self._sign(body, "secret") + assert not verify_line_signature(body + b" ", sig, "secret") + + def test_wrong_secret_rejected(self): + body = b'{"events": []}' + sig = self._sign(body, "secret") + assert not verify_line_signature(body, sig, "different") + + def test_empty_signature_rejected(self): + assert not verify_line_signature(b"x", "", "secret") + + def test_empty_secret_rejected(self): + assert not verify_line_signature(b"x", "AAAA", "") + + def test_garbage_signature_rejected(self): + assert not verify_line_signature(b"hello", "not base64 at all!!", "s") + + +# --------------------------------------------------------------------------- +# 2. Chat-id / source resolution +# --------------------------------------------------------------------------- + +class TestSourceResolution: + + def test_user_source(self): + chat_id, ctype = _resolve_chat({"type": "user", "userId": "U123"}) + assert chat_id == "U123" + assert ctype == "dm" + + def test_group_source(self): + chat_id, ctype = _resolve_chat({"type": "group", "groupId": "C456", "userId": "U123"}) + assert chat_id == "C456" + assert ctype == "group" + + def test_room_source(self): + chat_id, ctype = _resolve_chat({"type": "room", "roomId": "R789", "userId": "U123"}) + assert chat_id == "R789" + assert ctype == "room" + + def test_unknown_source_falls_back_to_dm(self): + chat_id, ctype = _resolve_chat({"type": "weird"}) + assert chat_id == "" + assert ctype == "dm" + + def test_empty_source(self): + chat_id, ctype = _resolve_chat({}) + assert chat_id == "" + assert ctype == "dm" + + +# --------------------------------------------------------------------------- +# 3. Three-allowlist gating +# --------------------------------------------------------------------------- + +class TestAllowlist: + + def test_allow_all_short_circuits(self): + for src in [ + {"type": "user", "userId": "Ufoo"}, + {"type": "group", "groupId": "Cfoo"}, + {"type": "room", "roomId": "Rfoo"}, + ]: + assert _allowed_for_source(src, allow_all=True, user_ids=set(), group_ids=set(), room_ids=set()) + + def test_user_in_allowlist_passes(self): + src = {"type": "user", "userId": "Uok"} + assert _allowed_for_source(src, allow_all=False, user_ids={"Uok"}, group_ids=set(), room_ids=set()) + + def test_user_not_in_allowlist_rejected(self): + src = {"type": "user", "userId": "Uother"} + assert not _allowed_for_source(src, allow_all=False, user_ids={"Uok"}, group_ids=set(), room_ids=set()) + + def test_group_uses_group_list_not_user_list(self): + src = {"type": "group", "groupId": "Cok", "userId": "Uany"} + assert _allowed_for_source(src, allow_all=False, user_ids={"Uany"}, group_ids={"Cok"}, room_ids=set()) + assert not _allowed_for_source(src, allow_all=False, user_ids={"Uany"}, group_ids=set(), room_ids=set()) + + def test_room_uses_room_list(self): + src = {"type": "room", "roomId": "Rok"} + assert _allowed_for_source(src, allow_all=False, user_ids=set(), group_ids=set(), room_ids={"Rok"}) + assert not _allowed_for_source(src, allow_all=False, user_ids=set(), group_ids=set(), room_ids=set()) + + def test_unknown_type_rejected(self): + src = {"type": "weird"} + assert not _allowed_for_source(src, allow_all=False, user_ids=set(), group_ids=set(), room_ids=set()) + + +# --------------------------------------------------------------------------- +# 4. Inbound dedup +# --------------------------------------------------------------------------- + +class TestDedup: + + def test_first_event_not_duplicate(self): + d = _MessageDeduplicator() + assert not d.is_duplicate("evt1") + + def test_repeat_event_marked_duplicate(self): + d = _MessageDeduplicator() + d.is_duplicate("evt1") + assert d.is_duplicate("evt1") + + def test_blank_id_not_treated_as_duplicate(self): + d = _MessageDeduplicator() + # Blank IDs should always pass through (don't lock out unidentifiable events). + assert not d.is_duplicate("") + assert not d.is_duplicate("") + + def test_lru_eviction_under_pressure(self): + d = _MessageDeduplicator(max_size=10) + for i in range(20): + d.is_duplicate(f"evt{i}") + # Exact eviction order isn't specified, but the cap must be enforced. + # Insert one more and assert the bookkeeping doesn't grow without bound. + d.is_duplicate("evt20") + assert len(d._seen) <= 20 # bounded — exact cap depends on eviction policy + + +# --------------------------------------------------------------------------- +# 5. RequestCache state machine +# --------------------------------------------------------------------------- + +class TestRequestCache: + + def test_register_pending_is_pending(self): + c = RequestCache() + rid = c.register_pending("Uchat") + assert c.get(rid).state is State.PENDING + assert c.get(rid).chat_id == "Uchat" + + def test_set_ready_transitions(self): + c = RequestCache() + rid = c.register_pending("Uchat") + c.set_ready(rid, "the answer") + assert c.get(rid).state is State.READY + assert c.get(rid).payload == "the answer" + + def test_set_error_transitions(self): + c = RequestCache() + rid = c.register_pending("Uchat") + c.set_error(rid, "boom") + assert c.get(rid).state is State.ERROR + assert c.get(rid).payload == "boom" + + def test_mark_delivered_from_ready(self): + c = RequestCache() + rid = c.register_pending("Uchat") + c.set_ready(rid, "x") + c.mark_delivered(rid) + assert c.get(rid).state is State.DELIVERED + + def test_mark_delivered_from_error(self): + c = RequestCache() + rid = c.register_pending("Uchat") + c.set_error(rid, "x") + c.mark_delivered(rid) + assert c.get(rid).state is State.DELIVERED + + def test_set_ready_on_delivered_is_noop(self): + c = RequestCache() + rid = c.register_pending("Uchat") + c.set_ready(rid, "first") + c.mark_delivered(rid) + c.set_ready(rid, "second") + # DELIVERED is terminal — no further mutation + assert c.get(rid).payload == "first" + assert c.get(rid).state is State.DELIVERED + + def test_find_pending_for_chat(self): + c = RequestCache() + rid_a = c.register_pending("Ua") + rid_b = c.register_pending("Ub") + assert c.find_pending_for_chat("Ua") == rid_a + assert c.find_pending_for_chat("Ub") == rid_b + assert c.find_pending_for_chat("Uc") is None + c.set_ready(rid_a, "x") + # No longer PENDING — should not be found + assert c.find_pending_for_chat("Ua") is None + + +# --------------------------------------------------------------------------- +# 6. Markdown stripping + chunking +# --------------------------------------------------------------------------- + +class TestMarkdownAndChunking: + + def test_bold_stripped(self): + assert strip_markdown_preserving_urls("**hello**") == "hello" + + def test_italic_stripped(self): + assert strip_markdown_preserving_urls("*hello*") == "hello" + + def test_inline_code_unfenced(self): + assert strip_markdown_preserving_urls("run `ls -la`") == "run ls -la" + + def test_link_preserved_with_url(self): + out = strip_markdown_preserving_urls("see [here](https://x.com)") + assert "https://x.com" in out + assert "here (https://x.com)" in out + + def test_heading_prefix_stripped(self): + out = strip_markdown_preserving_urls("# Title\n## Sub") + assert out == "Title\nSub" + + def test_bullet_marker_replaced(self): + out = strip_markdown_preserving_urls("- a\n- b") + assert out == "• a\n• b" + + def test_code_fence_content_kept(self): + # Source files often contain code snippets — the agent should still + # see the content as plain text, just without backticks. + md = "```python\nprint('hi')\n```" + out = strip_markdown_preserving_urls(md) + assert "print('hi')" in out + assert "```" not in out + + def test_split_short_returns_single_chunk(self): + assert split_for_line("hi") == ["hi"] + + def test_split_long_chunks_at_paragraph_boundary(self): + text = "para1\n\npara2\n\npara3" + chunks = split_for_line(text, max_chars=8) + assert all(len(c) <= 8 for c in chunks), chunks + assert len(chunks) >= 2 + + def test_split_caps_at_five_chunks(self): + # 1000 paragraphs of 100 chars each — must cap at 5 LINE bubbles. + text = "\n\n".join(["x" * 100 for _ in range(1000)]) + chunks = split_for_line(text) + assert len(chunks) <= 5 + + +# --------------------------------------------------------------------------- +# 7. Send routing (reply -> push fallback, batching, system-bypass) +# --------------------------------------------------------------------------- + +class TestSendRouting: + + @pytest.fixture + def adapter(self, monkeypatch): + monkeypatch.delenv("LINE_CHANNEL_ACCESS_TOKEN", raising=False) + monkeypatch.delenv("LINE_CHANNEL_SECRET", raising=False) + from gateway.config import PlatformConfig + cfg = PlatformConfig(enabled=True, extra={ + "channel_access_token": "tok", + "channel_secret": "sec", + }) + ad = LineAdapter(cfg) + ad._client = MagicMock() + ad._client.reply = AsyncMock() + ad._client.push = AsyncMock() + return ad + + def test_system_bypass_recognized(self): + assert _is_system_bypass("⚡ Interrupting current run") + assert _is_system_bypass("⏳ Queued — agent is busy") + assert _is_system_bypass("⏩ Steered toward new task") + assert not _is_system_bypass("Hello world") + assert not _is_system_bypass("") + + def test_send_uses_reply_when_token_present(self, adapter): + import time as _time + adapter._reply_tokens["Uchat"] = ("rt-token", _time.time() + 30) + result = asyncio.run(adapter.send("Uchat", "hello")) + assert result.success + adapter._client.reply.assert_called_once() + adapter._client.push.assert_not_called() + # Token consumed (single-use) + assert "Uchat" not in adapter._reply_tokens + + def test_send_falls_back_to_push_when_no_token(self, adapter): + result = asyncio.run(adapter.send("Uchat", "hello")) + assert result.success + adapter._client.push.assert_called_once() + adapter._client.reply.assert_not_called() + + def test_send_falls_back_to_push_when_reply_fails(self, adapter): + import time as _time + adapter._reply_tokens["Uchat"] = ("rt-token", _time.time() + 30) + adapter._client.reply.side_effect = RuntimeError("expired") + result = asyncio.run(adapter.send("Uchat", "hello")) + assert result.success + adapter._client.reply.assert_called_once() + adapter._client.push.assert_called_once() + + def test_send_returns_failure_when_push_fails(self, adapter): + adapter._client.push.side_effect = RuntimeError("network") + result = asyncio.run(adapter.send("Uchat", "hello")) + assert not result.success + assert "network" in result.error + + def test_send_pending_button_caches_response(self, adapter): + # Simulate that the slow-LLM postback button has fired. + rid = adapter._cache.register_pending("Uchat") + adapter._pending_buttons["Uchat"] = rid + result = asyncio.run(adapter.send("Uchat", "the answer")) + assert result.success + # Response must have been cached, not pushed/replied. + adapter._client.reply.assert_not_called() + adapter._client.push.assert_not_called() + assert adapter._cache.get(rid).state is State.READY + assert adapter._cache.get(rid).payload == "the answer" + + def test_send_system_bypass_skips_postback_cache(self, adapter): + # Even with a pending button, system busy-acks must surface visibly. + rid = adapter._cache.register_pending("Uchat") + adapter._pending_buttons["Uchat"] = rid + result = asyncio.run(adapter.send("Uchat", "⚡ Interrupting current run")) + assert result.success + # Bypass goes through push (no reply token stored) + adapter._client.push.assert_called_once() + # And the cache entry is unchanged (still PENDING for the eventual answer) + assert adapter._cache.get(rid).state is State.PENDING + + def test_send_caps_messages_per_call_at_five(self, adapter): + # Build a payload that would naturally split into more than 5 LINE + # bubbles; the chunker should cap at 5 + truncate. + big = "\n\n".join(["x" * 4500 for _ in range(20)]) + result = asyncio.run(adapter.send("Uchat", big)) + assert result.success + call_kwargs = adapter._client.push.call_args + # call_args is (args, kwargs); for our send the messages are the 2nd positional + sent_messages = call_kwargs.args[1] if call_kwargs.args else call_kwargs.kwargs.get("messages") + # Without args, fall back to inspecting the call shape + if sent_messages is None: + # We invoked client.push(chat_id, messages) — check first batch + sent_messages = adapter._client.push.call_args.args[1] + assert len(sent_messages) <= 5 + + def test_format_message_strips_markdown(self, adapter): + out = adapter.format_message("**bold** [link](https://x.com)") + assert "**" not in out + assert "https://x.com" in out + + +# --------------------------------------------------------------------------- +# 8. Register() metadata + plugin entry points +# --------------------------------------------------------------------------- + +class TestRegister: + + class _FakeCtx: + def __init__(self): + self.kwargs = None + + def register_platform(self, **kw): + self.kwargs = kw + + def test_register_calls_register_platform(self): + ctx = self._FakeCtx() + register(ctx) + assert ctx.kwargs is not None + assert ctx.kwargs["name"] == "line" + assert ctx.kwargs["label"] == "LINE" + + def test_register_advertises_required_env(self): + ctx = self._FakeCtx() + register(ctx) + assert set(ctx.kwargs["required_env"]) == { + "LINE_CHANNEL_ACCESS_TOKEN", + "LINE_CHANNEL_SECRET", + } + + def test_register_wires_allowlist_envs(self): + ctx = self._FakeCtx() + register(ctx) + assert ctx.kwargs["allowed_users_env"] == "LINE_ALLOWED_USERS" + assert ctx.kwargs["allow_all_env"] == "LINE_ALLOW_ALL_USERS" + + def test_register_wires_cron_home_channel(self): + ctx = self._FakeCtx() + register(ctx) + assert ctx.kwargs["cron_deliver_env_var"] == "LINE_HOME_CHANNEL" + + def test_register_provides_standalone_sender(self): + ctx = self._FakeCtx() + register(ctx) + assert callable(ctx.kwargs["standalone_sender_fn"]) + + def test_register_provides_env_enablement(self): + ctx = self._FakeCtx() + register(ctx) + assert callable(ctx.kwargs["env_enablement_fn"]) + + def test_register_factory_yields_line_adapter(self): + ctx = self._FakeCtx() + register(ctx) + from gateway.config import PlatformConfig + cfg = PlatformConfig(enabled=True, extra={ + "channel_access_token": "tok", + "channel_secret": "sec", + }) + ad = ctx.kwargs["adapter_factory"](cfg) + assert isinstance(ad, LineAdapter) + + def test_max_message_length_below_line_per_bubble_limit(self): + ctx = self._FakeCtx() + register(ctx) + # LINE per-bubble limit is 5000; we register 4500 to leave headroom. + assert ctx.kwargs["max_message_length"] <= 5000 + + +class TestEnvEnablement: + + def test_returns_none_without_credentials(self, monkeypatch): + monkeypatch.delenv("LINE_CHANNEL_ACCESS_TOKEN", raising=False) + monkeypatch.delenv("LINE_CHANNEL_SECRET", raising=False) + assert _env_enablement() is None + + def test_returns_dict_with_credentials(self, monkeypatch): + monkeypatch.setenv("LINE_CHANNEL_ACCESS_TOKEN", "tok") + monkeypatch.setenv("LINE_CHANNEL_SECRET", "sec") + assert _env_enablement() == {} + + def test_seeds_port_from_env(self, monkeypatch): + monkeypatch.setenv("LINE_CHANNEL_ACCESS_TOKEN", "tok") + monkeypatch.setenv("LINE_CHANNEL_SECRET", "sec") + monkeypatch.setenv("LINE_PORT", "8080") + assert _env_enablement() == {"port": 8080} + + def test_seeds_public_url(self, monkeypatch): + monkeypatch.setenv("LINE_CHANNEL_ACCESS_TOKEN", "tok") + monkeypatch.setenv("LINE_CHANNEL_SECRET", "sec") + monkeypatch.setenv("LINE_PUBLIC_URL", "https://my-tunnel.example.com") + result = _env_enablement() + assert result["public_url"] == "https://my-tunnel.example.com" + + +class TestStandaloneSend: + + def test_missing_token_returns_error(self, monkeypatch): + monkeypatch.delenv("LINE_CHANNEL_ACCESS_TOKEN", raising=False) + from gateway.config import PlatformConfig + cfg = PlatformConfig(enabled=True, extra={}) + result = asyncio.run(_standalone_send(cfg, "Uchat", "hi")) + assert "error" in result + + def test_missing_chat_id_returns_error(self, monkeypatch): + monkeypatch.setenv("LINE_CHANNEL_ACCESS_TOKEN", "tok") + from gateway.config import PlatformConfig + cfg = PlatformConfig(enabled=True, extra={}) + result = asyncio.run(_standalone_send(cfg, "", "hi")) + assert "error" in result + + def test_pushes_via_client_when_credentials_present(self, monkeypatch): + from gateway.config import PlatformConfig + + push_calls = [] + + class _FakeClient: + def __init__(self, *a, **kw): + pass + + async def push(self, chat_id, messages): + push_calls.append((chat_id, messages)) + + monkeypatch.setattr(_line, "_LineClient", _FakeClient) + cfg = PlatformConfig( + enabled=True, + extra={"channel_access_token": "tok"}, + ) + result = asyncio.run(_standalone_send(cfg, "Uchat", "hello")) + assert result.get("success") is True + assert len(push_calls) == 1 + assert push_calls[0][0] == "Uchat" + # Message wraps as text bubble + assert push_calls[0][1][0]["type"] == "text" + + +class TestPostbackButtonShape: + + def test_template_buttons_structure(self): + msg = build_postback_button_message("hi", "Tap me", "rid-1") + assert msg["type"] == "template" + assert msg["template"]["type"] == "buttons" + assert msg["template"]["text"] == "hi" + actions = msg["template"]["actions"] + assert len(actions) == 1 + assert actions[0]["type"] == "postback" + data = json.loads(actions[0]["data"]) + assert data == {"action": "show_response", "request_id": "rid-1"} + + def test_text_truncated_to_160(self): + long = "x" * 200 + msg = build_postback_button_message(long, "Tap", "rid") + assert len(msg["template"]["text"]) <= 160 + + def test_alt_text_truncated_to_400(self): + long = "x" * 500 + msg = build_postback_button_message(long, "Tap", "rid") + assert len(msg["altText"]) <= 400 + + +class TestCheckRequirements: + + def test_rejects_without_token(self, monkeypatch): + monkeypatch.delenv("LINE_CHANNEL_ACCESS_TOKEN", raising=False) + monkeypatch.setenv("LINE_CHANNEL_SECRET", "s") + assert not check_requirements() + + def test_rejects_without_secret(self, monkeypatch): + monkeypatch.setenv("LINE_CHANNEL_ACCESS_TOKEN", "t") + monkeypatch.delenv("LINE_CHANNEL_SECRET", raising=False) + assert not check_requirements() + + +class TestValidateConfig: + + def test_validates_from_extra(self): + from gateway.config import PlatformConfig + cfg = PlatformConfig( + enabled=True, + extra={"channel_access_token": "t", "channel_secret": "s"}, + ) + assert validate_config(cfg) + + def test_rejects_empty_config(self, monkeypatch): + monkeypatch.delenv("LINE_CHANNEL_ACCESS_TOKEN", raising=False) + monkeypatch.delenv("LINE_CHANNEL_SECRET", raising=False) + from gateway.config import PlatformConfig + cfg = PlatformConfig(enabled=True, extra={}) + assert not validate_config(cfg) + + +class TestAdapterInit: + + def test_init_from_config_extra(self, monkeypatch): + for k in ("LINE_CHANNEL_ACCESS_TOKEN", "LINE_CHANNEL_SECRET", "LINE_PORT"): + monkeypatch.delenv(k, raising=False) + from gateway.config import PlatformConfig + cfg = PlatformConfig( + enabled=True, + extra={ + "channel_access_token": "tok", + "channel_secret": "sec", + "port": 7777, + "public_url": "https://x.example.com", + "allowed_users": ["U1", "U2"], + }, + ) + ad = LineAdapter(cfg) + assert ad.channel_access_token == "tok" + assert ad.channel_secret == "sec" + assert ad.webhook_port == 7777 + assert ad.public_base_url == "https://x.example.com" + assert ad.allowed_users == {"U1", "U2"} + + def test_env_overrides_extra(self, monkeypatch): + monkeypatch.setenv("LINE_CHANNEL_ACCESS_TOKEN", "env-tok") + monkeypatch.setenv("LINE_PORT", "1234") + from gateway.config import PlatformConfig + cfg = PlatformConfig( + enabled=True, + extra={"channel_access_token": "extra-tok", "channel_secret": "s", "port": 5555}, + ) + ad = LineAdapter(cfg) + assert ad.channel_access_token == "env-tok" + assert ad.webhook_port == 1234 + + def test_csv_allowlist_parsed(self, monkeypatch): + monkeypatch.setenv("LINE_CHANNEL_ACCESS_TOKEN", "t") + monkeypatch.setenv("LINE_CHANNEL_SECRET", "s") + monkeypatch.setenv("LINE_ALLOWED_USERS", "U1, U2,U3") + monkeypatch.setenv("LINE_ALLOWED_GROUPS", "C1") + from gateway.config import PlatformConfig + ad = LineAdapter(PlatformConfig(enabled=True)) + assert ad.allowed_users == {"U1", "U2", "U3"} + assert ad.allowed_groups == {"C1"} + + def test_get_chat_info_infers_type_from_prefix(self, monkeypatch): + monkeypatch.setenv("LINE_CHANNEL_ACCESS_TOKEN", "t") + monkeypatch.setenv("LINE_CHANNEL_SECRET", "s") + from gateway.config import PlatformConfig + ad = LineAdapter(PlatformConfig(enabled=True)) + assert asyncio.run(ad.get_chat_info("U123"))["type"] == "dm" + assert asyncio.run(ad.get_chat_info("C123"))["type"] == "group" + assert asyncio.run(ad.get_chat_info("R123"))["type"] == "channel" diff --git a/website/docs/developer-guide/adding-platform-adapters.md b/website/docs/developer-guide/adding-platform-adapters.md index 1ba4b9a34cd..f3597dfca39 100644 --- a/website/docs/developer-guide/adding-platform-adapters.md +++ b/website/docs/developer-guide/adding-platform-adapters.md @@ -322,9 +322,98 @@ optional_env: Bare-string entries (`- MY_PLATFORM_TOKEN`) still work — they get a generic description auto-derived from the plugin's `label`. If a hardcoded entry for the same var already exists in `OPTIONAL_ENV_VARS`, it wins (back-compat); the plugin.yaml form acts as the fallback. +## Platform-Specific Slow-LLM UX + +Some platforms have constraints that change how a slow LLM response should be presented: + +- **LINE** issues a single-use *reply token* that expires roughly 60 seconds after the inbound event. Replying with that token is free; falling back to the metered Push API is not. If the LLM hasn't finished by the deadline, the choice is "burn paid Push quota" or "do something cleverer with the reply token before it expires." +- **WhatsApp** marks a session inactive after 24h, after which only template messages are accepted. +- **SMS** has no concept of typing indicators or progressive updates — long responses just look like the bot is offline. + +These are real constraints the base `BasePlatformAdapter` can't anticipate. The plugin surface intentionally leaves the room for an adapter to layer platform-specific UX on top of the base typing loop without expanding the kwarg list. + +### Pattern: subclass `_keep_typing` to layer mid-flight UX + +`BasePlatformAdapter._keep_typing` is the typing-indicator heartbeat — it runs as a background task while the LLM is generating, and is cancelled when the response is delivered. To layer a platform-specific behavior at a threshold (e.g. send a "still thinking" bubble at 45s), override `_keep_typing` in your adapter, schedule your own task alongside `super()._keep_typing()`, and tear it down in `finally`: + +```python +class LineAdapter(BasePlatformAdapter): + async def _keep_typing(self, chat_id: str, *args, **kwargs) -> None: + if self.slow_response_threshold <= 0: + await super()._keep_typing(chat_id, *args, **kwargs) + return + + async def _fire_at_threshold() -> None: + try: + await asyncio.sleep(self.slow_response_threshold) + except asyncio.CancelledError: + raise + # Platform-specific work here — for LINE, send a Template + # Buttons "Get answer" bubble using the cached reply token + # so the user can fetch the cached response later via a + # fresh (free) reply token from the postback callback. + await self._send_slow_response_button(chat_id) + + side_task = asyncio.create_task(_fire_at_threshold()) + try: + await super()._keep_typing(chat_id, *args, **kwargs) + finally: + if not side_task.done(): + side_task.cancel() + try: + await side_task + except (asyncio.CancelledError, Exception): + pass +``` + +Key points: + +- **Always `await super()._keep_typing(...)`.** The typing heartbeat is independently useful — don't replace it, layer on top of it. +- **Tear down the side task in `finally`.** When the LLM finishes (or `/stop` cancels the run), the gateway cancels the typing task. Your side task must observe that cancellation too, otherwise it lingers and may fire after the response was already delivered. +- **Pair with `interrupt_session_activity`** to resolve any orphan UX state when the user issues `/stop`. For LINE, this means transitioning the postback cache entry from `PENDING` to `ERROR` so the persistent "Get answer" button delivers a "Run was interrupted" message instead of looping. + +### Pattern: subclass `send` to route through a cache instead of sending immediately + +If your slow-response UX caches the response for later retrieval (LINE's postback flow), your `send` override needs to recognize three modes: + +1. **Pending postback active for this chat** → cache the response under the request_id, don't send anything visible. +2. **System busy-ack** (`⚡ Interrupting`, `⏳ Queued`, `⏩ Steered`) → bypass the cache and send visibly so the user sees the gateway's response to their input. +3. **Normal response** → send via reply-token-or-push as usual. + +```python +async def send(self, chat_id: str, content: str, **kw) -> SendResult: + if _is_system_bypass(content): + return await self._send_text_chunks(chat_id, content, force_push=False) + pending_rid = self._pending_buttons.get(chat_id) + if pending_rid: + self._cache.set_ready(pending_rid, content) + return SendResult(success=True, message_id=pending_rid) + return await self._send_text_chunks(chat_id, content, force_push=False) +``` + +`_SYSTEM_BYPASS_PREFIXES` are the gateway's own busy-acknowledgment prefixes (`⚡`, `⏳`, `⏩`, `💾`). Always let those through visibly, regardless of cached UX state. + +### When this pattern is appropriate + +Use the typing-loop override approach when: + +- The platform's outbound API has a hard time-window constraint (single-use reply token, expiring sticky session, etc.) AND +- A *visible mid-flight bubble* is acceptable UX on that platform. + +Use the simpler `slow_response_threshold = 0` always-Push path when: + +- The platform doesn't have a meaningful free vs. paid distinction, OR +- The user community prefers "loading… loading… DONE" silence-then-response over an interactive intermediate bubble. + +LINE supports both: the threshold defaults to 45s for free postback fetch, and `LINE_SLOW_RESPONSE_THRESHOLD=0` reverts to "always Push fallback." + ### Reference Implementation -See `plugins/platforms/irc/` in the repo for a complete working example — a full async IRC adapter with zero external dependencies. +See `plugins/platforms/line/adapter.py` for the full LINE postback implementation — a `RequestCache` state machine (`PENDING → READY → DELIVERED`, plus `ERROR` for `/stop`), a `_keep_typing` override that fires the Template Buttons bubble at threshold, a `send` override that routes through the cache, and an `interrupt_session_activity` override that resolves orphan PENDING entries. + +### Reference Implementations (Plugin Path) + +See `plugins/platforms/irc/` in the repo for a complete working example — a full async IRC adapter with zero external dependencies. `plugins/platforms/teams/` covers Bot Framework / Adaptive Cards, `plugins/platforms/google_chat/` covers OAuth-based REST APIs, and `plugins/platforms/line/` covers webhook-driven Messaging APIs with platform-specific slow-LLM UX. --- diff --git a/website/docs/reference/environment-variables.md b/website/docs/reference/environment-variables.md index a5b7e777db3..9d7208883b7 100644 --- a/website/docs/reference/environment-variables.md +++ b/website/docs/reference/environment-variables.md @@ -443,6 +443,28 @@ Only used when the [`teams_pipeline` plugin](/docs/user-guide/messaging/msgraph- | `TEAMS_CHANNEL_ID` | Target channel ID (paired with `TEAMS_TEAM_ID`). | | `TEAMS_CHAT_ID` | Target 1:1 or group chat ID (alternative to team+channel for `graph` mode). | +### LINE Messaging API + +Used by the bundled LINE platform plugin (`plugins/platforms/line/`). See [Messaging Gateway → LINE](/docs/user-guide/messaging/line) for full setup. + +| Variable | Description | +|----------|-------------| +| `LINE_CHANNEL_ACCESS_TOKEN` | Long-lived channel access token from the LINE Developers Console (Messaging API tab). Required. | +| `LINE_CHANNEL_SECRET` | Channel secret (Basic settings tab); used for HMAC-SHA256 webhook signature verification. Required. | +| `LINE_HOST` | Webhook bind host (default: `0.0.0.0`). | +| `LINE_PORT` | Webhook bind port (default: `8646`). | +| `LINE_PUBLIC_URL` | Public HTTPS base URL (e.g. `https://my-tunnel.example.com`). Required for image / audio / video sends — LINE only accepts HTTPS-reachable URLs. | +| `LINE_ALLOWED_USERS` | Comma-separated user IDs allowed to DM the bot (`U`-prefixed). | +| `LINE_ALLOWED_GROUPS` | Comma-separated group IDs the bot will respond in (`C`-prefixed). | +| `LINE_ALLOWED_ROOMS` | Comma-separated room IDs the bot will respond in (`R`-prefixed). | +| `LINE_ALLOW_ALL_USERS` | Dev-only escape hatch — accepts any source. Default: `false`. | +| `LINE_HOME_CHANNEL` | Default delivery target for cron jobs with `deliver: line`. | +| `LINE_SLOW_RESPONSE_THRESHOLD` | Seconds before the slow-LLM Template Buttons postback fires (default: `45`). Set `0` to disable and always Push-fallback. | +| `LINE_PENDING_TEXT` | Bubble text shown alongside the postback button. | +| `LINE_BUTTON_LABEL` | Postback button label (default: `Get answer`). | +| `LINE_DELIVERED_TEXT` | Reply when an already-delivered postback is tapped again (default: `Already replied ✅`). | +| `LINE_INTERRUPTED_TEXT` | Reply when a `/stop`-orphaned postback button is tapped (default: `Run was interrupted before completion.`). | + ### Advanced Messaging Tuning Advanced per-platform knobs for throttling the outbound message batcher. Most users never need to touch these; defaults are set to respect each platform's rate limits without feeling sluggish. diff --git a/website/docs/user-guide/messaging/index.md b/website/docs/user-guide/messaging/index.md index b6ed2796c10..b8ac6fecb3b 100644 --- a/website/docs/user-guide/messaging/index.md +++ b/website/docs/user-guide/messaging/index.md @@ -1,12 +1,12 @@ --- sidebar_position: 1 title: "Messaging Gateway" -description: "Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, SMS, Email, Home Assistant, Mattermost, Matrix, DingTalk, Yuanbao, Microsoft Teams, Webhooks, or any OpenAI-compatible frontend via the API server — architecture and setup overview" +description: "Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, SMS, Email, Home Assistant, Mattermost, Matrix, DingTalk, Yuanbao, Microsoft Teams, LINE, Webhooks, or any OpenAI-compatible frontend via the API server — architecture and setup overview" --- # Messaging Gateway -Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, SMS, Email, Home Assistant, Mattermost, Matrix, DingTalk, Feishu/Lark, WeCom, Weixin, BlueBubbles (iMessage), QQ, Yuanbao, Microsoft Teams, or your browser. The gateway is a single background process that connects to all your configured platforms, handles sessions, runs cron jobs, and delivers voice messages. +Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, SMS, Email, Home Assistant, Mattermost, Matrix, DingTalk, Feishu/Lark, WeCom, Weixin, BlueBubbles (iMessage), QQ, Yuanbao, Microsoft Teams, LINE, or your browser. The gateway is a single background process that connects to all your configured platforms, handles sessions, runs cron jobs, and delivers voice messages. For the full voice feature set — including CLI microphone mode, spoken replies in messaging, and Discord voice-channel conversations — see [Voice Mode](/docs/user-guide/features/voice-mode) and [Use Voice Mode with Hermes](/docs/guides/use-voice-mode-with-hermes). @@ -34,6 +34,7 @@ For the full voice feature set — including CLI microphone mode, spoken replies | QQ | ✅ | ✅ | ✅ | — | — | ✅ | — | | Yuanbao | ✅ | ✅ | ✅ | — | — | ✅ | ✅ | | Microsoft Teams | — | ✅ | — | ✅ | — | ✅ | — | +| LINE | — | ✅ | ✅ | — | — | ✅ | — | **Voice** = TTS audio replies and/or voice message transcription. **Images** = send/receive images. **Files** = send/receive file attachments. **Threads** = threaded conversations. **Reactions** = emoji reactions on messages. **Typing** = typing indicator while processing. **Streaming** = progressive message updates via editing. diff --git a/website/docs/user-guide/messaging/line.md b/website/docs/user-guide/messaging/line.md new file mode 100644 index 00000000000..1aa3a753816 --- /dev/null +++ b/website/docs/user-guide/messaging/line.md @@ -0,0 +1,198 @@ +--- +sidebar_position: 17 +title: "LINE" +description: "Set up Hermes Agent as a LINE Messaging API bot" +--- + +# LINE Setup + +Run Hermes Agent as a [LINE](https://line.me/) bot via the official LINE Messaging API. The adapter lives as a bundled platform plugin under `plugins/platforms/line/` — no core edits, just enable it like any other platform. + +LINE is the dominant messaging app in Japan, Taiwan, and Thailand. If your users live there, this is how they reach you. + +## How the bot responds + +| Context | Behavior | +|---------|----------| +| **1:1 chat** (`U` IDs) | Responds to every message | +| **Group chat** (`C` IDs) | Responds when the group is on the allowlist | +| **Multi-user room** (`R` IDs) | Responds when the room is on the allowlist | + +Inbound text, images, audio, video, files, stickers, and locations are all handled. Outbound text uses the **free reply token first** (single-use, ~60s window) and falls back to the metered Push API when the token has expired. + +--- + +## Step 1: Create a LINE Messaging API channel + +1. Go to the [LINE Developers Console](https://developers.line.biz/console/). +2. Create a Provider, then under it a **Messaging API** channel. +3. From the channel's **Basic settings** tab, copy the **Channel secret**. +4. From the **Messaging API** tab, scroll to **Channel access token (long-lived)** and click **Issue**. Copy the token. +5. In the **Messaging API** tab, also disable **Auto-reply messages** and **Greeting messages** so they don't fight your bot's replies. + +--- + +## Step 2: Expose the webhook port + +LINE delivers webhooks over public HTTPS. The default port is `8646` — override with `LINE_PORT` if needed. + +```bash +# Cloudflare Tunnel (recommended for production — fixed hostname) +cloudflared tunnel --url http://localhost:8646 + +# ngrok (good for dev) +ngrok http 8646 + +# devtunnel +devtunnel create hermes-line --allow-anonymous +devtunnel port create hermes-line -p 8646 --protocol https +devtunnel host hermes-line +``` + +Copy the `https://...` URL — you'll set it as the webhook URL below. **Leave the tunnel running** while testing. For production, set up a fixed Cloudflare named tunnel so the webhook URL doesn't change on restart. + +--- + +## Step 3: Configure Hermes + +Add to `~/.hermes/.env`: + +```env +LINE_CHANNEL_ACCESS_TOKEN=YOUR_LONG_LIVED_TOKEN +LINE_CHANNEL_SECRET=YOUR_CHANNEL_SECRET + +# Allowlist — at least one of these (or LINE_ALLOW_ALL_USERS=true for dev) +LINE_ALLOWED_USERS=U1234567890abcdef... # comma-separated U-prefixed IDs +LINE_ALLOWED_GROUPS=C1234567890abcdef... # optional group IDs +LINE_ALLOWED_ROOMS=R1234567890abcdef... # optional room IDs + +# Required for image / audio / video sends — the public HTTPS base URL +# the tunnel resolves to. Without it, send_image/voice/video will refuse. +LINE_PUBLIC_URL=https://my-tunnel.example.com +``` + +Then in `~/.hermes/config.yaml`: + +```yaml +gateway: + platforms: + line: + enabled: true +``` + +That's enough — the bundled-plugin scan in `gateway/config.py` automatically picks up `plugins/platforms/line/`. No `Platform.LINE` enum edit, no `_create_adapter` registration. + +--- + +## Step 4: Set the webhook URL + +Back in the LINE console: + +1. Open your channel → **Messaging API** tab. +2. Under **Webhook settings** → **Webhook URL**, paste `https:///line/webhook` (note the `/line/webhook` path — the adapter listens there). +3. Click **Verify**. LINE pings the URL; you should see a 200. +4. Toggle **Use webhook** to **On**. + +--- + +## Step 5: Run the gateway + +```bash +hermes gateway +``` + +The agent log shows: + +``` +LINE: webhook listening on 0.0.0.0:8646/line/webhook (public: https://my-tunnel.example.com) +``` + +Add the bot as a friend from the LINE app (scan the QR in the channel's **Messaging API** tab) and send it a message. + +--- + +## Slow LLM responses + +LINE's reply token is single-use and expires roughly 60 seconds after the inbound event. Slow LLMs can't reply in time, which would normally force a paid Push API call. + +When the LLM is still running past `LINE_SLOW_RESPONSE_THRESHOLD` seconds (default `45`), the adapter consumes the original reply token to send a **Template Buttons** bubble: + +> 🤔 Still thinking. Tap below to fetch the answer when it's ready. +> +> [ Get answer ] + +The user taps **Get answer** when convenient — that postback delivers a *fresh* reply token, which the adapter uses to send the cached answer (still free). + +State machine: `PENDING → READY → DELIVERED`, plus `ERROR` for cancelled runs (the orphan PENDING resolves to "Run was interrupted before completion." after `/stop` so the persistent button doesn't loop). + +To disable the postback button and always Push-fallback instead: + +```env +LINE_SLOW_RESPONSE_THRESHOLD=0 +``` + +For the postback flow to fire reliably, suppress chatter that would consume the reply token before the threshold: + +```yaml +# ~/.hermes/config.yaml +display: + interim_assistant_messages: false + platforms: + line: + tool_progress: off +``` + +--- + +## Cron / notification delivery + +```env +LINE_HOME_CHANNEL=Uxxxxxxxxxxxxxxxxxxxx # default delivery target +``` + +Cron jobs with `deliver: line` route to `LINE_HOME_CHANNEL`. The adapter ships a standalone Push-only sender so cron jobs work even when cron runs in a separate process from the gateway. + +--- + +## Environment variable reference + +| Variable | Required | Default | Description | +|---|---|---|---| +| `LINE_CHANNEL_ACCESS_TOKEN` | yes | — | Long-lived channel access token | +| `LINE_CHANNEL_SECRET` | yes | — | Channel secret (HMAC-SHA256 webhook verification) | +| `LINE_HOST` | no | `0.0.0.0` | Webhook bind host | +| `LINE_PORT` | no | `8646` | Webhook bind port | +| `LINE_PUBLIC_URL` | for media | — | Public HTTPS base URL; required for image/voice/video sends | +| `LINE_ALLOWED_USERS` | one of | — | Comma-separated user IDs (U-prefixed) | +| `LINE_ALLOWED_GROUPS` | one of | — | Comma-separated group IDs (C-prefixed) | +| `LINE_ALLOWED_ROOMS` | one of | — | Comma-separated room IDs (R-prefixed) | +| `LINE_ALLOW_ALL_USERS` | dev only | `false` | Skip allowlist entirely | +| `LINE_HOME_CHANNEL` | no | — | Default cron / notification delivery target | +| `LINE_SLOW_RESPONSE_THRESHOLD` | no | `45` | Seconds before the postback button fires (`0` = disabled) | +| `LINE_PENDING_TEXT` | no | "🤔 Still thinking…" | Bubble text shown alongside the postback button | +| `LINE_BUTTON_LABEL` | no | "Get answer" | Button label | +| `LINE_DELIVERED_TEXT` | no | "Already replied ✅" | Reply when an already-delivered button is tapped again | +| `LINE_INTERRUPTED_TEXT` | no | "Run was interrupted before completion." | Reply when a `/stop` orphan button is tapped | + +--- + +## Troubleshooting + +**"invalid signature" on webhook verify.** The `Channel secret` was copied wrong, or your tunnel rewrote the request body. Verify with `curl -i https:///line/webhook/health` first — that should return `{"status":"ok","platform":"line"}`. + +**Bot receives nothing in groups.** Check `LINE_ALLOWED_GROUPS` includes the `C...` group ID. To find a group ID, send a test message and grep `~/.hermes/logs/gateway.log` for `LINE: rejecting unauthorized source` — the rejected source dict has the IDs. + +**`send_image` fails with "LINE_PUBLIC_URL must be set".** LINE's Messaging API does not accept binary uploads — images, audio, and video must be reachable HTTPS URLs. Set `LINE_PUBLIC_URL` to the tunnel's public hostname and the adapter will serve files from `/line/media//` automatically. + +**Postback button never appears.** Either the LLM responded faster than `LINE_SLOW_RESPONSE_THRESHOLD`, or another bubble (tool-progress, streaming) consumed the reply token first. See the suppression block under "Slow LLM responses". + +**"already in use by another profile".** The same channel access token is bound to another running Hermes profile. Stop the other gateway or use a separate channel. + +--- + +## Limitations + +* **Single bubble per chunk.** Each LINE text bubble is capped at 5000 characters, and at most 5 bubbles are sent per Reply/Push call. Longer responses are truncated with an ellipsis. +* **No native message editing.** LINE has no edit-message API — streaming responses always send fresh bubbles, never edit prior ones. +* **No Markdown rendering.** Bold (`**`), italics (`*`), code fences, and headings render as literal characters. The adapter strips them before sending; URLs are preserved (`[label](url)` becomes `label (url)`). +* **Loading indicator is DM-only.** LINE rejects the chat/loading API for groups and rooms, so the typing indicator only shows in 1:1 chats. diff --git a/website/sidebars.ts b/website/sidebars.ts index 938eb9c0677..a29f366219a 100644 --- a/website/sidebars.ts +++ b/website/sidebars.ts @@ -141,6 +141,7 @@ const sidebars: SidebarsConfig = { 'user-guide/messaging/teams', 'user-guide/messaging/teams-meetings', 'user-guide/messaging/msgraph-webhook', + 'user-guide/messaging/line', 'user-guide/messaging/open-webui', 'user-guide/messaging/webhooks', ],