diff --git a/gateway/dead_targets.py b/gateway/dead_targets.py new file mode 100644 index 00000000000..66a9247f213 --- /dev/null +++ b/gateway/dead_targets.py @@ -0,0 +1,143 @@ +"""Persistent registry of delivery targets that are confirmed unreachable. + +When a messaging platform reports that a target chat is permanently gone — a +deleted group (``Forbidden: the group chat was deleted``), a bot kicked/blocked, +or a deactivated user — re-sending to it on every cron tick or every fan-out +delivery wastes a send attempt against the platform's flood-control envelope and +spams the logs. This registry lets the delivery layer short-circuit a target it +has already proven dead, while staying self-healing: any successful send to that +target clears the flag, so a user who re-adds the bot (or restores the chat) +recovers automatically with no manual cleanup. + +Scope is deliberately narrow. Only *whole-chat* deaths are recorded — the +``forbidden`` and chat-level ``not_found`` (``chat not found``) error kinds. +Thread/topic-level ``not_found`` is NOT recorded here: the adapters already +self-heal that by retrying without ``reply_to`` (see the Telegram adapter's +reply-target-deleted path), and a deleted topic does not mean the parent chat is +dead. + +The store is a small JSON file under the active profile's HERMES_HOME so each +profile keeps its own dead set. Reads/writes are best-effort: a corrupt or +unwritable file degrades to an in-memory-only registry rather than raising on +the delivery path. +""" + +from __future__ import annotations + +import json +import logging +import threading +import time +from pathlib import Path +from typing import Dict, Optional + +from hermes_cli.config import get_hermes_home + +logger = logging.getLogger(__name__) + +# Error kinds (from gateway.platforms.base.classify_send_error) that mean the +# *whole chat* is unreachable, not a transient or thread-level problem. +_DEAD_ERROR_KINDS = frozenset({"forbidden", "not_found"}) + + +def _normalize(platform: str, chat_id: str) -> str: + """Canonical key for a (platform, chat_id) pair.""" + return f"{str(platform).strip().lower()}:{str(chat_id).strip()}" + + +class DeadTargetRegistry: + """Thread-safe, persistent set of confirmed-dead delivery targets. + + Keyed on ``platform:chat_id``. Stores the reason and a timestamp for + observability. Self-healing: :meth:`clear` (called on a successful send) + removes the flag. + """ + + def __init__(self, path: Optional[Path] = None) -> None: + self._lock = threading.RLock() + self._dead: Dict[str, Dict[str, object]] = {} + if path is not None: + self._path = path + else: + self._path = get_hermes_home() / "gateway" / "dead_targets.json" + self._load() + + # -- persistence ------------------------------------------------------- + + def _load(self) -> None: + try: + if self._path.exists(): + raw = json.loads(self._path.read_text()) + if isinstance(raw, dict): + # Only keep well-shaped entries. + self._dead = { + k: v for k, v in raw.items() if isinstance(v, dict) + } + except (OSError, ValueError) as exc: + logger.debug("dead_targets: could not load %s (%s) — starting empty", + self._path, exc) + self._dead = {} + + def _flush_locked(self) -> None: + try: + self._path.parent.mkdir(parents=True, exist_ok=True) + tmp = self._path.with_suffix(self._path.suffix + ".tmp") + tmp.write_text(json.dumps(self._dead, indent=2)) + tmp.replace(self._path) + except OSError as exc: + # Best-effort: keep the in-memory state, don't break delivery. + logger.debug("dead_targets: could not persist %s (%s)", self._path, exc) + + # -- public API -------------------------------------------------------- + + @staticmethod + def is_dead_error_kind(error_kind: Optional[str]) -> bool: + """Return True when ``error_kind`` denotes a permanent whole-chat death.""" + return bool(error_kind) and error_kind in _DEAD_ERROR_KINDS + + def is_dead(self, platform: str, chat_id: Optional[str]) -> bool: + if not chat_id: + return False + with self._lock: + return _normalize(platform, chat_id) in self._dead + + def mark_dead(self, platform: str, chat_id: Optional[str], + reason: str = "") -> bool: + """Record a target as confirmed-dead. Returns True if newly added.""" + if not chat_id: + return False + key = _normalize(platform, chat_id) + with self._lock: + existed = key in self._dead + self._dead[key] = { + "platform": str(platform).strip().lower(), + "chat_id": str(chat_id), + "reason": str(reason)[:200], + "marked_at": time.time(), + } + self._flush_locked() + if not existed: + logger.info( + "dead_targets: marked %s as unreachable (%s) — future deliveries " + "to this target will be skipped until a send succeeds", + key, reason or "no reason given", + ) + return not existed + + def clear(self, platform: str, chat_id: Optional[str]) -> bool: + """Remove a target's dead flag (self-healing). Returns True if it was set.""" + if not chat_id: + return False + key = _normalize(platform, chat_id) + with self._lock: + if key in self._dead: + del self._dead[key] + self._flush_locked() + logger.info("dead_targets: cleared %s (delivery succeeded again)", key) + return True + return False + + def all_dead(self) -> Dict[str, Dict[str, object]]: + """Snapshot of the current dead set (for diagnostics / `hermes` CLI).""" + with self._lock: + return {k: dict(v) for k, v in self._dead.items()} diff --git a/gateway/delivery.py b/gateway/delivery.py index faec3ca45eb..58280371ce1 100644 --- a/gateway/delivery.py +++ b/gateway/delivery.py @@ -56,6 +56,7 @@ def _is_silence_narration(content: Optional[str]) -> bool: from .config import Platform, GatewayConfig from .session import SessionSource +from .dead_targets import DeadTargetRegistry def _looks_like_telegram_private_chat_id(chat_id: Optional[str]) -> bool: @@ -96,6 +97,32 @@ def _is_thread_not_found_delivery_error(result: Any) -> bool: return bool(error and "thread not found" in error.lower()) +def _send_result_error_kind(result: Any) -> Optional[str]: + """Return the machine-readable error_kind from a SendResult/dict, if any.""" + if isinstance(result, dict): + kind = result.get("error_kind") + else: + kind = getattr(result, "error_kind", None) + return str(kind) if kind else None + + +def _classify_dead_from_error_text(error_text: Optional[str]) -> Optional[str]: + """Best-effort dead-target classification from a raised error's text. + + ``_deliver_to_platform`` raises (it does not return a SendResult) on a hard + failure, so the ``deliver()`` loop only has the exception string. Reuse the + platform-neutral classifier to recover the error_kind from that text. + """ + if not error_text: + return None + try: + from .platforms.base import classify_send_error + except Exception: # pragma: no cover - import guard + return None + kind = classify_send_error(None, error_text=error_text) + return kind if DeadTargetRegistry.is_dead_error_kind(kind) else None + + @dataclass class DeliveryTarget: """ @@ -185,17 +212,21 @@ class DeliveryRouter: messages to the right platform adapters. """ - def __init__(self, config: GatewayConfig, adapters: Dict[Platform, Any] = None): + def __init__(self, config: GatewayConfig, adapters: Dict[Platform, Any] = None, + dead_targets: Optional[DeadTargetRegistry] = None): """ Initialize the delivery router. Args: config: Gateway configuration adapters: Dict mapping platforms to their adapter instances + dead_targets: Optional shared registry of confirmed-unreachable + targets. When omitted, a profile-local registry is created. """ self.config = config self.adapters = adapters or {} self.output_dir = get_hermes_home() / "cron" / "output" + self.dead_targets = dead_targets or DeadTargetRegistry() async def deliver( self, @@ -221,17 +252,50 @@ class DeliveryRouter: results = {} for target in targets: + # Skip targets we've already proven permanently unreachable + # (deleted group, blocked/kicked bot, deactivated user). Re-sending + # to them on every tick wastes a send against flood control and + # spams logs. Self-healing: a later successful send clears the flag. + # LOCAL/origin-without-chat targets are never dead-tracked. + if ( + target.platform != Platform.LOCAL + and target.chat_id + and self.dead_targets.is_dead(target.platform.value, target.chat_id) + ): + logger.info( + "Skipping delivery to known-dead target %s:%s " + "(send to it again to clear)", + target.platform.value, target.chat_id, + ) + results[target.to_string()] = { + "success": False, + "skipped": "dead_target", + "error": "target previously confirmed unreachable", + } + continue try: if target.platform == Platform.LOCAL: result = self._deliver_local(content, job_id, job_name, metadata) else: result = await self._deliver_to_platform(target, content, metadata) + # Successful platform delivery — clear any stale dead flag. + if target.chat_id and not _send_result_failed(result): + self.dead_targets.clear(target.platform.value, target.chat_id) results[target.to_string()] = { "success": True, "result": result } except Exception as e: + # A hard failure raises here. If the platform reported a + # whole-chat death, record it so future deliveries short-circuit. + if target.platform != Platform.LOCAL and target.chat_id: + dead_kind = _classify_dead_from_error_text(str(e)) + if dead_kind: + self.dead_targets.mark_dead( + target.platform.value, target.chat_id, + reason=f"{dead_kind}: {str(e)[:120]}", + ) results[target.to_string()] = { "success": False, "error": str(e) diff --git a/infographic/dead-delivery-targets/infographic.png b/infographic/dead-delivery-targets/infographic.png new file mode 100644 index 00000000000..9dbf3431ee5 Binary files /dev/null and b/infographic/dead-delivery-targets/infographic.png differ diff --git a/tests/gateway/test_dead_targets.py b/tests/gateway/test_dead_targets.py new file mode 100644 index 00000000000..92c68ee312b --- /dev/null +++ b/tests/gateway/test_dead_targets.py @@ -0,0 +1,169 @@ +"""Tests for confirmed-dead delivery-target short-circuiting (deleted Telegram +groups, blocked/kicked bots, deactivated users). + +Covers the full lifecycle through the real ``DeliveryRouter.deliver()`` path: + forbidden send -> target marked dead + next delivery -> short-circuited (adapter never called) + successful send -> dead flag cleared (self-healing) + +and the standalone ``DeadTargetRegistry`` persistence/classification contract. +""" + +import pytest + +from gateway.config import GatewayConfig, Platform +from gateway.delivery import DeliveryRouter, DeliveryTarget +from gateway.dead_targets import DeadTargetRegistry + + +class ForbiddenThenOkAdapter: + """First send raises a deleted-group Forbidden; subsequent sends succeed.""" + + def __init__(self, fail_times=1): + self.calls = [] + self._fail_times = fail_times + + async def send(self, chat_id, content, metadata=None): + self.calls.append(chat_id) + if len(self.calls) <= self._fail_times: + raise RuntimeError("Forbidden: the group chat was deleted") + return {"success": True} + + +class TransientFailAdapter: + async def send(self, chat_id, content, metadata=None): + raise RuntimeError("httpx.ReadTimeout: connection timed out") + + +@pytest.fixture +def isolate(tmp_path, monkeypatch): + monkeypatch.setattr("gateway.delivery.get_hermes_home", lambda: tmp_path) + monkeypatch.setattr("gateway.dead_targets.get_hermes_home", lambda: tmp_path) + return tmp_path + + +# -------------------------------------------------------------------------- +# DeadTargetRegistry unit contract +# -------------------------------------------------------------------------- + +class TestDeadTargetRegistry: + def test_mark_is_dead_clear_roundtrip(self, isolate): + reg = DeadTargetRegistry() + assert reg.is_dead("telegram", "123") is False + assert reg.mark_dead("telegram", "123", "forbidden") is True + assert reg.is_dead("telegram", "123") is True + # idempotent: second mark returns False (already present) + assert reg.mark_dead("telegram", "123", "forbidden") is False + assert reg.clear("telegram", "123") is True + assert reg.is_dead("telegram", "123") is False + + def test_persists_across_instances(self, isolate): + reg = DeadTargetRegistry() + reg.mark_dead("telegram", "999", "deleted group") + # New instance reads the same on-disk store under tmp HERMES_HOME. + reg2 = DeadTargetRegistry() + assert reg2.is_dead("telegram", "999") is True + + def test_key_is_case_insensitive_on_platform(self, isolate): + reg = DeadTargetRegistry() + reg.mark_dead("TeleGram", "5", "x") + assert reg.is_dead("telegram", "5") is True + + def test_none_chat_id_is_never_dead(self, isolate): + reg = DeadTargetRegistry() + assert reg.mark_dead("telegram", None) is False + assert reg.is_dead("telegram", None) is False + + def test_is_dead_error_kind_classification(self): + assert DeadTargetRegistry.is_dead_error_kind("forbidden") is True + assert DeadTargetRegistry.is_dead_error_kind("not_found") is True + assert DeadTargetRegistry.is_dead_error_kind("rate_limited") is False + assert DeadTargetRegistry.is_dead_error_kind("transient") is False + assert DeadTargetRegistry.is_dead_error_kind(None) is False + + def test_corrupt_store_degrades_to_empty(self, isolate): + path = isolate / "gateway" / "dead_targets.json" + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text("{ this is not json") + reg = DeadTargetRegistry() # must not raise + assert reg.all_dead() == {} + + +# -------------------------------------------------------------------------- +# DeliveryRouter end-to-end lifecycle +# -------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_forbidden_marks_target_dead_then_short_circuits(isolate): + adapter = ForbiddenThenOkAdapter(fail_times=99) + router = DeliveryRouter(GatewayConfig(), adapters={Platform.TELEGRAM: adapter}) + target = DeliveryTarget.parse("telegram:42") + + # First delivery: send raises Forbidden -> failure + target recorded dead. + res1 = await router.deliver("hi", [target]) + assert res1["telegram:42"]["success"] is False + assert router.dead_targets.is_dead("telegram", "42") is True + assert adapter.calls == ["42"] # adapter was invoked once + + # Second delivery: short-circuited, adapter NOT called again. + res2 = await router.deliver("hi again", [target]) + assert res2["telegram:42"]["skipped"] == "dead_target" + assert res2["telegram:42"]["success"] is False + assert adapter.calls == ["42"] # still only the original call + + +@pytest.mark.asyncio +async def test_successful_send_clears_dead_flag(isolate): + # Fails once (gets marked dead), then succeeds. + adapter = ForbiddenThenOkAdapter(fail_times=1) + router = DeliveryRouter(GatewayConfig(), adapters={Platform.TELEGRAM: adapter}) + target = DeliveryTarget.parse("telegram:7") + + # Pre-seed dead via the first (failing) delivery. + await router.deliver("a", [target]) + assert router.dead_targets.is_dead("telegram", "7") is True + + # Manually clear to simulate the user re-adding the bot, then deliver again. + router.dead_targets.clear("telegram", "7") + res = await router.deliver("b", [target]) + assert res["telegram:7"]["success"] is True + # Flag stays cleared after a successful send. + assert router.dead_targets.is_dead("telegram", "7") is False + + +@pytest.mark.asyncio +async def test_transient_failure_does_not_mark_dead(isolate): + adapter = TransientFailAdapter() + router = DeliveryRouter(GatewayConfig(), adapters={Platform.TELEGRAM: adapter}) + target = DeliveryTarget.parse("telegram:13") + + res = await router.deliver("hi", [target]) + assert res["telegram:13"]["success"] is False + # A timeout/transient error must NOT mark the chat dead — it may recover. + assert router.dead_targets.is_dead("telegram", "13") is False + + +@pytest.mark.asyncio +async def test_local_target_is_never_dead_tracked(isolate): + router = DeliveryRouter(GatewayConfig(), adapters={}) + target = DeliveryTarget.parse("local") + res = await router.deliver("hi", [target]) + assert res["local"]["success"] is True + assert router.dead_targets.all_dead() == {} + + +@pytest.mark.asyncio +async def test_shared_registry_is_used_when_injected(isolate): + shared = DeadTargetRegistry() + shared.mark_dead("telegram", "500", "pre-existing") + adapter = ForbiddenThenOkAdapter(fail_times=0) + router = DeliveryRouter( + GatewayConfig(), + adapters={Platform.TELEGRAM: adapter}, + dead_targets=shared, + ) + target = DeliveryTarget.parse("telegram:500") + res = await router.deliver("hi", [target]) + # Injected registry's pre-existing flag short-circuits before any send. + assert res["telegram:500"]["skipped"] == "dead_target" + assert adapter.calls == []