hermes-agent/plugins/platforms/homeassistant/adapter.py
kshitijk4poor c37c6eaf29 refactor(gateway): migrate Home Assistant adapter to bundled plugin
Move gateway/platforms/homeassistant.py into plugins/platforms/homeassistant/
following the same shape as the Mattermost and Discord migrations.

  - Adapter file is renamed via git mv (history is preserved).
  - register() exposes the platform via the plugin system instead of the
    hardcoded Platform.HOMEASSISTANT elif in gateway/run.py::build_adapter().
  - _standalone_send() replaces the legacy _send_homeassistant() helper in
    tools/send_message_tool.py.  Out-of-process cron delivery
    (deliver=homeassistant from a cron process not co-located with the
    gateway) now flows through the registry's standalone_sender_fn path
    instead of the hardcoded elif.
  - _is_connected() probes HASS_TOKEN via hermes_cli.gateway.get_env_value
    so existing connected-platform checks behave identically.

The HASS_TOKEN / HASS_URL env-to-PlatformConfig seeding in
gateway/config.py stays in core — same pattern bluebubbles, mattermost,
and discord migrations followed.  No setup_fn or apply_yaml_config_fn is
registered because Home Assistant has no _setup_homeassistant wizard in
hermes_cli/setup.py and no homeassistant: YAML block in config.yaml today;
setup runs through the existing hermes_cli/tools_config.py toolset wizard.

Test imports were rewritten across tests/gateway/test_homeassistant.py,
tests/integration/test_ha_integration.py, and
tests/tools/test_send_message_missing_platforms.py; the legacy
(token, extra, chat_id, message)-shaped _send_homeassistant call site is
preserved via a small SimpleNamespace shim in
test_send_message_missing_platforms.py (same approach used when
mattermost moved).

  - Focused HA suites (64 tests across the three rewritten files) pass.
  - Broader gateway/cron sweep produces 10 failures identical to main
    baseline (telegram approval/model-picker xdist isolation flakes,
    wecom_callback defusedxml issue, cron script_timeout fixture issue).
    Zero net new failures.
2026-06-06 11:46:24 -07:00

577 lines
21 KiB
Python

"""
Home Assistant platform adapter.
Connects to the HA WebSocket API for real-time event monitoring.
State-change events are converted to MessageEvent objects and forwarded
to the agent for processing. Outbound messages are delivered as HA
persistent notifications.
Requires:
- aiohttp (already in messaging extras)
- HASS_TOKEN env var (Long-Lived Access Token)
- HASS_URL env var (default: http://homeassistant.local:8123)
"""
import asyncio
import json
import logging
import os
import time
import uuid
from datetime import datetime
from typing import Any, Dict, Optional, Set
try:
import aiohttp
AIOHTTP_AVAILABLE = True
except ImportError:
AIOHTTP_AVAILABLE = False
aiohttp = None # type: ignore[assignment]
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import (
BasePlatformAdapter,
MessageEvent,
MessageType,
SendResult,
)
logger = logging.getLogger(__name__)
def check_ha_requirements() -> bool:
"""Check if Home Assistant dependencies are available and configured."""
if not AIOHTTP_AVAILABLE:
return False
if not os.getenv("HASS_TOKEN"):
return False
return True
class HomeAssistantAdapter(BasePlatformAdapter):
"""
Home Assistant WebSocket adapter.
Subscribes to ``state_changed`` events and forwards them as
MessageEvent objects. Supports domain/entity filtering and
per-entity cooldowns to avoid event floods.
"""
MAX_MESSAGE_LENGTH = 4096
# Reconnection backoff schedule (seconds)
_BACKOFF_STEPS = [5, 10, 30, 60]
def __init__(self, config: PlatformConfig):
super().__init__(config, Platform.HOMEASSISTANT)
# Connection state
self._session: Optional["aiohttp.ClientSession"] = None
self._ws: Optional["aiohttp.ClientWebSocketResponse"] = None
self._rest_session: Optional["aiohttp.ClientSession"] = None
self._listen_task: Optional[asyncio.Task] = None
self._msg_id: int = 0
# Configuration from extra
extra = config.extra or {}
token = config.token or os.getenv("HASS_TOKEN", "")
url = extra.get("url") or os.getenv("HASS_URL", "http://homeassistant.local:8123")
self._hass_url: str = url.rstrip("/")
self._hass_token: str = token
# Event filtering
self._watch_domains: Set[str] = set(extra.get("watch_domains", []))
self._watch_entities: Set[str] = set(extra.get("watch_entities", []))
self._ignore_entities: Set[str] = set(extra.get("ignore_entities", []))
self._watch_all: bool = bool(extra.get("watch_all", False))
self._cooldown_seconds: int = int(extra.get("cooldown_seconds", 30))
# Cooldown tracking: entity_id -> last_event_timestamp
self._last_event_time: Dict[str, float] = {}
def _next_id(self) -> int:
"""Return the next WebSocket message ID."""
self._msg_id += 1
return self._msg_id
# ------------------------------------------------------------------
# Connection lifecycle
# ------------------------------------------------------------------
async def connect(self) -> bool:
"""Connect to HA WebSocket API and subscribe to events."""
if not AIOHTTP_AVAILABLE:
logger.warning("[%s] aiohttp not installed. Run: pip install aiohttp", self.name)
return False
if not self._hass_token:
logger.warning("[%s] No HASS_TOKEN configured", self.name)
return False
try:
success = await self._ws_connect()
if not success:
return False
# Dedicated REST session for send() calls
self._rest_session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30)
)
# Warn if no event filters are configured
if not self._watch_domains and not self._watch_entities and not self._watch_all:
logger.warning(
"[%s] No watch_domains, watch_entities, or watch_all configured. "
"All state_changed events will be dropped. Configure filters in "
"your HA platform config to receive events.",
self.name,
)
# Start background listener
self._listen_task = asyncio.create_task(self._listen_loop())
self._running = True
logger.info("[%s] Connected to %s", self.name, self._hass_url)
return True
except Exception as e:
logger.error("[%s] Failed to connect: %s", self.name, e)
return False
async def _ws_connect(self) -> bool:
"""Establish WebSocket connection and authenticate."""
ws_url = self._hass_url.replace("https://", "wss://").replace("http://", "ws://")
ws_url = f"{ws_url}/api/websocket"
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30)
)
self._ws = await self._session.ws_connect(ws_url, heartbeat=30, timeout=30)
# Step 1: Receive auth_required
msg = await self._ws.receive_json()
if msg.get("type") != "auth_required":
logger.error("Expected auth_required, got: %s", msg.get("type"))
await self._cleanup_ws()
return False
# Step 2: Send auth
await self._ws.send_json({
"type": "auth",
"access_token": self._hass_token,
})
# Step 3: Wait for auth_ok
msg = await self._ws.receive_json()
if msg.get("type") != "auth_ok":
logger.error("Auth failed: %s", msg)
await self._cleanup_ws()
return False
# Step 4: Subscribe to state_changed events
sub_id = self._next_id()
await self._ws.send_json({
"id": sub_id,
"type": "subscribe_events",
"event_type": "state_changed",
})
# Verify subscription acknowledgement
msg = await self._ws.receive_json()
if not msg.get("success"):
logger.error("Failed to subscribe to events: %s", msg)
await self._cleanup_ws()
return False
return True
async def _cleanup_ws(self) -> None:
"""Close WebSocket and session."""
if self._ws and not self._ws.closed:
await self._ws.close()
self._ws = None
if self._session and not self._session.closed:
await self._session.close()
self._session = None
async def disconnect(self) -> None:
"""Disconnect from Home Assistant."""
self._running = False
if self._listen_task:
self._listen_task.cancel()
try:
await self._listen_task
except asyncio.CancelledError:
pass
self._listen_task = None
await self._cleanup_ws()
if self._rest_session and not self._rest_session.closed:
await self._rest_session.close()
self._rest_session = None
logger.info("[%s] Disconnected", self.name)
# ------------------------------------------------------------------
# Event listener
# ------------------------------------------------------------------
async def _listen_loop(self) -> None:
"""Main event loop with automatic reconnection."""
backoff_idx = 0
while self._running:
try:
await self._read_events()
except asyncio.CancelledError:
return
except Exception as e:
logger.warning("[%s] WebSocket error: %s", self.name, e)
if not self._running:
return
# Reconnect with backoff
delay = self._BACKOFF_STEPS[min(backoff_idx, len(self._BACKOFF_STEPS) - 1)]
logger.info("[%s] Reconnecting in %ds...", self.name, delay)
await asyncio.sleep(delay)
backoff_idx += 1
try:
await self._cleanup_ws()
success = await self._ws_connect()
if success:
backoff_idx = 0 # Reset on successful reconnect
logger.info("[%s] Reconnected", self.name)
except Exception as e:
logger.warning("[%s] Reconnection failed: %s", self.name, e)
async def _read_events(self) -> None:
"""Read events from WebSocket until disconnected."""
if self._ws is None or self._ws.closed:
return
async for ws_msg in self._ws:
if ws_msg.type == aiohttp.WSMsgType.TEXT:
try:
data = json.loads(ws_msg.data)
if data.get("type") == "event":
await self._handle_ha_event(data.get("event", {}))
except json.JSONDecodeError:
logger.debug("Invalid JSON from HA WS: %s", ws_msg.data[:200])
elif ws_msg.type in {aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR}:
break
async def _handle_ha_event(self, event: Dict[str, Any]) -> None:
"""Process a state_changed event from Home Assistant."""
event_data = event.get("data", {})
entity_id: str = event_data.get("entity_id", "")
if not entity_id:
return
# Apply ignore filter
if entity_id in self._ignore_entities:
return
# Apply domain/entity watch filters (closed by default — require
# explicit watch_domains, watch_entities, or watch_all to forward)
domain = entity_id.split(".")[0] if "." in entity_id else ""
if self._watch_domains or self._watch_entities:
domain_match = domain in self._watch_domains if self._watch_domains else False
entity_match = entity_id in self._watch_entities if self._watch_entities else False
if not domain_match and not entity_match:
return
elif not self._watch_all:
# No filters configured and watch_all is off — drop the event
return
# Apply cooldown
now = time.time()
last = self._last_event_time.get(entity_id, 0)
if (now - last) < self._cooldown_seconds:
return
self._last_event_time[entity_id] = now
# Build human-readable message
old_state = event_data.get("old_state", {})
new_state = event_data.get("new_state", {})
message = self._format_state_change(entity_id, old_state, new_state)
if not message:
return
# Build MessageEvent and forward to handler
source = self.build_source(
chat_id="ha_events",
chat_name="Home Assistant Events",
chat_type="channel",
user_id="homeassistant",
user_name="Home Assistant",
)
msg_event = MessageEvent(
text=message,
message_type=MessageType.TEXT,
source=source,
message_id=f"ha_{entity_id}_{int(now)}",
timestamp=datetime.now(),
)
await self.handle_message(msg_event)
@staticmethod
def _format_state_change(
entity_id: str,
old_state: Dict[str, Any],
new_state: Dict[str, Any],
) -> Optional[str]:
"""Convert a state_changed event into a human-readable description."""
if not new_state:
return None
old_val = old_state.get("state", "unknown") if old_state else "unknown"
new_val = new_state.get("state", "unknown")
# Skip if state didn't actually change
if old_val == new_val:
return None
friendly_name = new_state.get("attributes", {}).get("friendly_name", entity_id)
domain = entity_id.split(".")[0] if "." in entity_id else ""
# Domain-specific formatting
if domain == "climate":
attrs = new_state.get("attributes", {})
temp = attrs.get("current_temperature", "?")
target = attrs.get("temperature", "?")
return (
f"[Home Assistant] {friendly_name}: HVAC mode changed from "
f"'{old_val}' to '{new_val}' (current: {temp}, target: {target})"
)
if domain == "sensor":
unit = new_state.get("attributes", {}).get("unit_of_measurement", "")
return (
f"[Home Assistant] {friendly_name}: changed from "
f"{old_val}{unit} to {new_val}{unit}"
)
if domain == "binary_sensor":
return (
f"[Home Assistant] {friendly_name}: "
f"{'triggered' if new_val == 'on' else 'cleared'} "
f"(was {'triggered' if old_val == 'on' else 'cleared'})"
)
if domain in {"light", "switch", "fan"}:
return (
f"[Home Assistant] {friendly_name}: turned "
f"{'on' if new_val == 'on' else 'off'}"
)
if domain == "alarm_control_panel":
return (
f"[Home Assistant] {friendly_name}: alarm state changed from "
f"'{old_val}' to '{new_val}'"
)
# Generic fallback
return (
f"[Home Assistant] {friendly_name} ({entity_id}): "
f"changed from '{old_val}' to '{new_val}'"
)
# ------------------------------------------------------------------
# Outbound messaging
# ------------------------------------------------------------------
async def send(
self,
chat_id: str,
content: str,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Send a notification via HA REST API (persistent_notification.create).
Uses the REST API instead of WebSocket to avoid a race condition
with the event listener loop that reads from the same WS connection.
"""
url = f"{self._hass_url}/api/services/persistent_notification/create"
headers = {
"Authorization": f"Bearer {self._hass_token}",
"Content-Type": "application/json",
}
payload = {
"title": "Hermes Agent",
"message": content[:self.MAX_MESSAGE_LENGTH],
}
try:
if self._rest_session:
async with self._rest_session.post(
url,
headers=headers,
json=payload,
timeout=aiohttp.ClientTimeout(total=10),
) as resp:
if resp.status < 300:
return SendResult(success=True, message_id=uuid.uuid4().hex[:12])
else:
body = await resp.text()
return SendResult(success=False, error=f"HTTP {resp.status}: {body}")
else:
async with aiohttp.ClientSession() as session:
async with session.post(
url,
headers=headers,
json=payload,
timeout=aiohttp.ClientTimeout(total=10),
) as resp:
if resp.status < 300:
return SendResult(success=True, message_id=uuid.uuid4().hex[:12])
else:
body = await resp.text()
return SendResult(success=False, error=f"HTTP {resp.status}: {body}")
except asyncio.TimeoutError:
return SendResult(success=False, error="Timeout sending notification to HA")
except Exception as e:
return SendResult(success=False, error=str(e))
async def send_typing(self, chat_id: str, metadata=None) -> None:
"""No typing indicator for Home Assistant."""
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
"""Return basic info about the HA event channel."""
return {
"name": "Home Assistant Events",
"type": "channel",
"url": self._hass_url,
}
# ---------------------------------------------------------------------------
# Standalone (out-of-process) sender — used by cron deliver=homeassistant
# ---------------------------------------------------------------------------
async def _standalone_send(
pconfig,
chat_id: str,
message: str,
*,
thread_id: Optional[str] = None,
media_files: Optional[list] = None,
force_document: bool = False,
) -> Dict[str, Any]:
"""Send a notification via the HA ``notify.notify`` service without a
live gateway adapter.
Used by ``tools/send_message_tool._send_via_adapter`` when the gateway
runner is not in this process (typical for cron jobs running
out-of-process). The HTTP path is the same one the legacy
``_send_homeassistant`` helper used in ``tools/send_message_tool.py``
before this migration.
Reads ``HASS_TOKEN`` from ``pconfig.token`` (set by the gateway config
loader from env) and falls back to the ``HASS_TOKEN`` env var. Server
URL comes from ``pconfig.extra["url"]`` (seeded by the env loader in
``gateway/config.py``) or the ``HASS_URL`` env var.
``thread_id``, ``media_files`` and ``force_document`` are accepted for
signature parity with other standalone senders. HA notifications have
no native threading or attachment model — these arguments are ignored.
"""
if not AIOHTTP_AVAILABLE:
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
extra = getattr(pconfig, "extra", {}) or {}
hass_url = (extra.get("url") or os.getenv("HASS_URL", "")).rstrip("/")
token = (getattr(pconfig, "token", None) or os.getenv("HASS_TOKEN", "")).strip()
if not hass_url or not token:
return {
"error": (
"Home Assistant standalone send: HASS_URL and HASS_TOKEN "
"must both be set"
)
}
url = f"{hass_url}/api/services/notify/notify"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
payload = {"message": message, "target": chat_id}
try:
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30)
) as session:
async with session.post(url, headers=headers, json=payload) as resp:
if resp.status not in {200, 201}:
body = await resp.text()
return {
"error": (
f"Home Assistant API error ({resp.status}): {body}"
)
}
return {
"success": True,
"platform": "homeassistant",
"chat_id": chat_id,
}
except asyncio.TimeoutError:
return {"error": "Timeout sending notification to Home Assistant"}
except Exception as e:
return {"error": f"Home Assistant send failed: {e}"}
# ---------------------------------------------------------------------------
# is_connected probe
# ---------------------------------------------------------------------------
def _is_connected(config) -> bool:
"""Home Assistant is considered connected when ``HASS_TOKEN`` is set.
Looks up via ``hermes_cli.gateway.get_env_value`` at call time (not via
the plugin's own bound import) so tests that patch
``gateway_mod.get_env_value`` can suppress ambient ``HASS_TOKEN`` env
vars. Matches what the legacy connected-platforms check did before
this migration.
"""
import hermes_cli.gateway as gateway_mod
return bool((gateway_mod.get_env_value("HASS_TOKEN") or "").strip())
# ---------------------------------------------------------------------------
# Plugin registration entry point
# ---------------------------------------------------------------------------
def _build_adapter(config):
"""Factory wrapper that constructs HomeAssistantAdapter from a PlatformConfig."""
return HomeAssistantAdapter(config)
def register(ctx) -> None:
"""Plugin entry point — called by the Hermes plugin system."""
ctx.register_platform(
name="homeassistant",
label="Home Assistant",
adapter_factory=_build_adapter,
check_fn=check_ha_requirements,
is_connected=_is_connected,
required_env=["HASS_TOKEN"],
install_hint="pip install aiohttp",
# Out-of-process cron delivery via the HA ``notify.notify`` service.
# Without this hook, ``deliver=homeassistant`` cron jobs would fail
# with "No live adapter" when cron runs separately from the gateway.
# Mirrors the Discord / Teams / Mattermost pattern.
standalone_sender_fn=_standalone_send,
# HA notification message cap — matches MAX_MESSAGE_LENGTH on the
# adapter class above.
max_message_length=HomeAssistantAdapter.MAX_MESSAGE_LENGTH,
# Display
emoji="🏠",
allow_update_command=True,
)