mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-30 11:52:04 +00:00
fix(gateway): skip confirmed-dead delivery targets (deleted groups, blocked bots) (#55115)
* fix(gateway): skip confirmed-dead delivery targets (deleted groups, blocked bots) A deleted Telegram group, kicked/blocked bot, or deactivated user keeps throwing Forbidden/not_found on every cron tick and fan-out delivery. Each retry burns a send against the platform's flood-control envelope and spams the logs, making the whole session feel broken even when the model call completed. Add a small persistent DeadTargetRegistry (per-profile JSON under HERMES_HOME) that records a target the moment a send reports a whole-chat death (forbidden / chat-level not_found), and have DeliveryRouter.deliver() short-circuit it on subsequent attempts. Self-healing: any successful send clears the flag, so a user re-adding the bot recovers with no manual cleanup. Thread/topic-level not_found is NOT recorded (adapters already self-heal that by retrying without reply_to). Transient/timeout errors are never marked dead. * infographic: dead delivery target skipping
This commit is contained in:
parent
d417ffb363
commit
290fa7fd2b
4 changed files with 377 additions and 1 deletions
143
gateway/dead_targets.py
Normal file
143
gateway/dead_targets.py
Normal file
|
|
@ -0,0 +1,143 @@
|
|||
"""Persistent registry of delivery targets that are confirmed unreachable.
|
||||
|
||||
When a messaging platform reports that a target chat is permanently gone — a
|
||||
deleted group (``Forbidden: the group chat was deleted``), a bot kicked/blocked,
|
||||
or a deactivated user — re-sending to it on every cron tick or every fan-out
|
||||
delivery wastes a send attempt against the platform's flood-control envelope and
|
||||
spams the logs. This registry lets the delivery layer short-circuit a target it
|
||||
has already proven dead, while staying self-healing: any successful send to that
|
||||
target clears the flag, so a user who re-adds the bot (or restores the chat)
|
||||
recovers automatically with no manual cleanup.
|
||||
|
||||
Scope is deliberately narrow. Only *whole-chat* deaths are recorded — the
|
||||
``forbidden`` and chat-level ``not_found`` (``chat not found``) error kinds.
|
||||
Thread/topic-level ``not_found`` is NOT recorded here: the adapters already
|
||||
self-heal that by retrying without ``reply_to`` (see the Telegram adapter's
|
||||
reply-target-deleted path), and a deleted topic does not mean the parent chat is
|
||||
dead.
|
||||
|
||||
The store is a small JSON file under the active profile's HERMES_HOME so each
|
||||
profile keeps its own dead set. Reads/writes are best-effort: a corrupt or
|
||||
unwritable file degrades to an in-memory-only registry rather than raising on
|
||||
the delivery path.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Dict, Optional
|
||||
|
||||
from hermes_cli.config import get_hermes_home
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Error kinds (from gateway.platforms.base.classify_send_error) that mean the
|
||||
# *whole chat* is unreachable, not a transient or thread-level problem.
|
||||
_DEAD_ERROR_KINDS = frozenset({"forbidden", "not_found"})
|
||||
|
||||
|
||||
def _normalize(platform: str, chat_id: str) -> str:
|
||||
"""Canonical key for a (platform, chat_id) pair."""
|
||||
return f"{str(platform).strip().lower()}:{str(chat_id).strip()}"
|
||||
|
||||
|
||||
class DeadTargetRegistry:
|
||||
"""Thread-safe, persistent set of confirmed-dead delivery targets.
|
||||
|
||||
Keyed on ``platform:chat_id``. Stores the reason and a timestamp for
|
||||
observability. Self-healing: :meth:`clear` (called on a successful send)
|
||||
removes the flag.
|
||||
"""
|
||||
|
||||
def __init__(self, path: Optional[Path] = None) -> None:
|
||||
self._lock = threading.RLock()
|
||||
self._dead: Dict[str, Dict[str, object]] = {}
|
||||
if path is not None:
|
||||
self._path = path
|
||||
else:
|
||||
self._path = get_hermes_home() / "gateway" / "dead_targets.json"
|
||||
self._load()
|
||||
|
||||
# -- persistence -------------------------------------------------------
|
||||
|
||||
def _load(self) -> None:
|
||||
try:
|
||||
if self._path.exists():
|
||||
raw = json.loads(self._path.read_text())
|
||||
if isinstance(raw, dict):
|
||||
# Only keep well-shaped entries.
|
||||
self._dead = {
|
||||
k: v for k, v in raw.items() if isinstance(v, dict)
|
||||
}
|
||||
except (OSError, ValueError) as exc:
|
||||
logger.debug("dead_targets: could not load %s (%s) — starting empty",
|
||||
self._path, exc)
|
||||
self._dead = {}
|
||||
|
||||
def _flush_locked(self) -> None:
|
||||
try:
|
||||
self._path.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp = self._path.with_suffix(self._path.suffix + ".tmp")
|
||||
tmp.write_text(json.dumps(self._dead, indent=2))
|
||||
tmp.replace(self._path)
|
||||
except OSError as exc:
|
||||
# Best-effort: keep the in-memory state, don't break delivery.
|
||||
logger.debug("dead_targets: could not persist %s (%s)", self._path, exc)
|
||||
|
||||
# -- public API --------------------------------------------------------
|
||||
|
||||
@staticmethod
|
||||
def is_dead_error_kind(error_kind: Optional[str]) -> bool:
|
||||
"""Return True when ``error_kind`` denotes a permanent whole-chat death."""
|
||||
return bool(error_kind) and error_kind in _DEAD_ERROR_KINDS
|
||||
|
||||
def is_dead(self, platform: str, chat_id: Optional[str]) -> bool:
|
||||
if not chat_id:
|
||||
return False
|
||||
with self._lock:
|
||||
return _normalize(platform, chat_id) in self._dead
|
||||
|
||||
def mark_dead(self, platform: str, chat_id: Optional[str],
|
||||
reason: str = "") -> bool:
|
||||
"""Record a target as confirmed-dead. Returns True if newly added."""
|
||||
if not chat_id:
|
||||
return False
|
||||
key = _normalize(platform, chat_id)
|
||||
with self._lock:
|
||||
existed = key in self._dead
|
||||
self._dead[key] = {
|
||||
"platform": str(platform).strip().lower(),
|
||||
"chat_id": str(chat_id),
|
||||
"reason": str(reason)[:200],
|
||||
"marked_at": time.time(),
|
||||
}
|
||||
self._flush_locked()
|
||||
if not existed:
|
||||
logger.info(
|
||||
"dead_targets: marked %s as unreachable (%s) — future deliveries "
|
||||
"to this target will be skipped until a send succeeds",
|
||||
key, reason or "no reason given",
|
||||
)
|
||||
return not existed
|
||||
|
||||
def clear(self, platform: str, chat_id: Optional[str]) -> bool:
|
||||
"""Remove a target's dead flag (self-healing). Returns True if it was set."""
|
||||
if not chat_id:
|
||||
return False
|
||||
key = _normalize(platform, chat_id)
|
||||
with self._lock:
|
||||
if key in self._dead:
|
||||
del self._dead[key]
|
||||
self._flush_locked()
|
||||
logger.info("dead_targets: cleared %s (delivery succeeded again)", key)
|
||||
return True
|
||||
return False
|
||||
|
||||
def all_dead(self) -> Dict[str, Dict[str, object]]:
|
||||
"""Snapshot of the current dead set (for diagnostics / `hermes` CLI)."""
|
||||
with self._lock:
|
||||
return {k: dict(v) for k, v in self._dead.items()}
|
||||
|
|
@ -56,6 +56,7 @@ def _is_silence_narration(content: Optional[str]) -> bool:
|
|||
|
||||
from .config import Platform, GatewayConfig
|
||||
from .session import SessionSource
|
||||
from .dead_targets import DeadTargetRegistry
|
||||
|
||||
|
||||
def _looks_like_telegram_private_chat_id(chat_id: Optional[str]) -> bool:
|
||||
|
|
@ -96,6 +97,32 @@ def _is_thread_not_found_delivery_error(result: Any) -> bool:
|
|||
return bool(error and "thread not found" in error.lower())
|
||||
|
||||
|
||||
def _send_result_error_kind(result: Any) -> Optional[str]:
|
||||
"""Return the machine-readable error_kind from a SendResult/dict, if any."""
|
||||
if isinstance(result, dict):
|
||||
kind = result.get("error_kind")
|
||||
else:
|
||||
kind = getattr(result, "error_kind", None)
|
||||
return str(kind) if kind else None
|
||||
|
||||
|
||||
def _classify_dead_from_error_text(error_text: Optional[str]) -> Optional[str]:
|
||||
"""Best-effort dead-target classification from a raised error's text.
|
||||
|
||||
``_deliver_to_platform`` raises (it does not return a SendResult) on a hard
|
||||
failure, so the ``deliver()`` loop only has the exception string. Reuse the
|
||||
platform-neutral classifier to recover the error_kind from that text.
|
||||
"""
|
||||
if not error_text:
|
||||
return None
|
||||
try:
|
||||
from .platforms.base import classify_send_error
|
||||
except Exception: # pragma: no cover - import guard
|
||||
return None
|
||||
kind = classify_send_error(None, error_text=error_text)
|
||||
return kind if DeadTargetRegistry.is_dead_error_kind(kind) else None
|
||||
|
||||
|
||||
@dataclass
|
||||
class DeliveryTarget:
|
||||
"""
|
||||
|
|
@ -185,17 +212,21 @@ class DeliveryRouter:
|
|||
messages to the right platform adapters.
|
||||
"""
|
||||
|
||||
def __init__(self, config: GatewayConfig, adapters: Dict[Platform, Any] = None):
|
||||
def __init__(self, config: GatewayConfig, adapters: Dict[Platform, Any] = None,
|
||||
dead_targets: Optional[DeadTargetRegistry] = None):
|
||||
"""
|
||||
Initialize the delivery router.
|
||||
|
||||
Args:
|
||||
config: Gateway configuration
|
||||
adapters: Dict mapping platforms to their adapter instances
|
||||
dead_targets: Optional shared registry of confirmed-unreachable
|
||||
targets. When omitted, a profile-local registry is created.
|
||||
"""
|
||||
self.config = config
|
||||
self.adapters = adapters or {}
|
||||
self.output_dir = get_hermes_home() / "cron" / "output"
|
||||
self.dead_targets = dead_targets or DeadTargetRegistry()
|
||||
|
||||
async def deliver(
|
||||
self,
|
||||
|
|
@ -221,17 +252,50 @@ class DeliveryRouter:
|
|||
results = {}
|
||||
|
||||
for target in targets:
|
||||
# Skip targets we've already proven permanently unreachable
|
||||
# (deleted group, blocked/kicked bot, deactivated user). Re-sending
|
||||
# to them on every tick wastes a send against flood control and
|
||||
# spams logs. Self-healing: a later successful send clears the flag.
|
||||
# LOCAL/origin-without-chat targets are never dead-tracked.
|
||||
if (
|
||||
target.platform != Platform.LOCAL
|
||||
and target.chat_id
|
||||
and self.dead_targets.is_dead(target.platform.value, target.chat_id)
|
||||
):
|
||||
logger.info(
|
||||
"Skipping delivery to known-dead target %s:%s "
|
||||
"(send to it again to clear)",
|
||||
target.platform.value, target.chat_id,
|
||||
)
|
||||
results[target.to_string()] = {
|
||||
"success": False,
|
||||
"skipped": "dead_target",
|
||||
"error": "target previously confirmed unreachable",
|
||||
}
|
||||
continue
|
||||
try:
|
||||
if target.platform == Platform.LOCAL:
|
||||
result = self._deliver_local(content, job_id, job_name, metadata)
|
||||
else:
|
||||
result = await self._deliver_to_platform(target, content, metadata)
|
||||
# Successful platform delivery — clear any stale dead flag.
|
||||
if target.chat_id and not _send_result_failed(result):
|
||||
self.dead_targets.clear(target.platform.value, target.chat_id)
|
||||
|
||||
results[target.to_string()] = {
|
||||
"success": True,
|
||||
"result": result
|
||||
}
|
||||
except Exception as e:
|
||||
# A hard failure raises here. If the platform reported a
|
||||
# whole-chat death, record it so future deliveries short-circuit.
|
||||
if target.platform != Platform.LOCAL and target.chat_id:
|
||||
dead_kind = _classify_dead_from_error_text(str(e))
|
||||
if dead_kind:
|
||||
self.dead_targets.mark_dead(
|
||||
target.platform.value, target.chat_id,
|
||||
reason=f"{dead_kind}: {str(e)[:120]}",
|
||||
)
|
||||
results[target.to_string()] = {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
|
|
|
|||
BIN
infographic/dead-delivery-targets/infographic.png
Normal file
BIN
infographic/dead-delivery-targets/infographic.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 1.2 MiB |
169
tests/gateway/test_dead_targets.py
Normal file
169
tests/gateway/test_dead_targets.py
Normal file
|
|
@ -0,0 +1,169 @@
|
|||
"""Tests for confirmed-dead delivery-target short-circuiting (deleted Telegram
|
||||
groups, blocked/kicked bots, deactivated users).
|
||||
|
||||
Covers the full lifecycle through the real ``DeliveryRouter.deliver()`` path:
|
||||
forbidden send -> target marked dead
|
||||
next delivery -> short-circuited (adapter never called)
|
||||
successful send -> dead flag cleared (self-healing)
|
||||
|
||||
and the standalone ``DeadTargetRegistry`` persistence/classification contract.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import GatewayConfig, Platform
|
||||
from gateway.delivery import DeliveryRouter, DeliveryTarget
|
||||
from gateway.dead_targets import DeadTargetRegistry
|
||||
|
||||
|
||||
class ForbiddenThenOkAdapter:
|
||||
"""First send raises a deleted-group Forbidden; subsequent sends succeed."""
|
||||
|
||||
def __init__(self, fail_times=1):
|
||||
self.calls = []
|
||||
self._fail_times = fail_times
|
||||
|
||||
async def send(self, chat_id, content, metadata=None):
|
||||
self.calls.append(chat_id)
|
||||
if len(self.calls) <= self._fail_times:
|
||||
raise RuntimeError("Forbidden: the group chat was deleted")
|
||||
return {"success": True}
|
||||
|
||||
|
||||
class TransientFailAdapter:
|
||||
async def send(self, chat_id, content, metadata=None):
|
||||
raise RuntimeError("httpx.ReadTimeout: connection timed out")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def isolate(tmp_path, monkeypatch):
|
||||
monkeypatch.setattr("gateway.delivery.get_hermes_home", lambda: tmp_path)
|
||||
monkeypatch.setattr("gateway.dead_targets.get_hermes_home", lambda: tmp_path)
|
||||
return tmp_path
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# DeadTargetRegistry unit contract
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
class TestDeadTargetRegistry:
|
||||
def test_mark_is_dead_clear_roundtrip(self, isolate):
|
||||
reg = DeadTargetRegistry()
|
||||
assert reg.is_dead("telegram", "123") is False
|
||||
assert reg.mark_dead("telegram", "123", "forbidden") is True
|
||||
assert reg.is_dead("telegram", "123") is True
|
||||
# idempotent: second mark returns False (already present)
|
||||
assert reg.mark_dead("telegram", "123", "forbidden") is False
|
||||
assert reg.clear("telegram", "123") is True
|
||||
assert reg.is_dead("telegram", "123") is False
|
||||
|
||||
def test_persists_across_instances(self, isolate):
|
||||
reg = DeadTargetRegistry()
|
||||
reg.mark_dead("telegram", "999", "deleted group")
|
||||
# New instance reads the same on-disk store under tmp HERMES_HOME.
|
||||
reg2 = DeadTargetRegistry()
|
||||
assert reg2.is_dead("telegram", "999") is True
|
||||
|
||||
def test_key_is_case_insensitive_on_platform(self, isolate):
|
||||
reg = DeadTargetRegistry()
|
||||
reg.mark_dead("TeleGram", "5", "x")
|
||||
assert reg.is_dead("telegram", "5") is True
|
||||
|
||||
def test_none_chat_id_is_never_dead(self, isolate):
|
||||
reg = DeadTargetRegistry()
|
||||
assert reg.mark_dead("telegram", None) is False
|
||||
assert reg.is_dead("telegram", None) is False
|
||||
|
||||
def test_is_dead_error_kind_classification(self):
|
||||
assert DeadTargetRegistry.is_dead_error_kind("forbidden") is True
|
||||
assert DeadTargetRegistry.is_dead_error_kind("not_found") is True
|
||||
assert DeadTargetRegistry.is_dead_error_kind("rate_limited") is False
|
||||
assert DeadTargetRegistry.is_dead_error_kind("transient") is False
|
||||
assert DeadTargetRegistry.is_dead_error_kind(None) is False
|
||||
|
||||
def test_corrupt_store_degrades_to_empty(self, isolate):
|
||||
path = isolate / "gateway" / "dead_targets.json"
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text("{ this is not json")
|
||||
reg = DeadTargetRegistry() # must not raise
|
||||
assert reg.all_dead() == {}
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# DeliveryRouter end-to-end lifecycle
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_forbidden_marks_target_dead_then_short_circuits(isolate):
|
||||
adapter = ForbiddenThenOkAdapter(fail_times=99)
|
||||
router = DeliveryRouter(GatewayConfig(), adapters={Platform.TELEGRAM: adapter})
|
||||
target = DeliveryTarget.parse("telegram:42")
|
||||
|
||||
# First delivery: send raises Forbidden -> failure + target recorded dead.
|
||||
res1 = await router.deliver("hi", [target])
|
||||
assert res1["telegram:42"]["success"] is False
|
||||
assert router.dead_targets.is_dead("telegram", "42") is True
|
||||
assert adapter.calls == ["42"] # adapter was invoked once
|
||||
|
||||
# Second delivery: short-circuited, adapter NOT called again.
|
||||
res2 = await router.deliver("hi again", [target])
|
||||
assert res2["telegram:42"]["skipped"] == "dead_target"
|
||||
assert res2["telegram:42"]["success"] is False
|
||||
assert adapter.calls == ["42"] # still only the original call
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_successful_send_clears_dead_flag(isolate):
|
||||
# Fails once (gets marked dead), then succeeds.
|
||||
adapter = ForbiddenThenOkAdapter(fail_times=1)
|
||||
router = DeliveryRouter(GatewayConfig(), adapters={Platform.TELEGRAM: adapter})
|
||||
target = DeliveryTarget.parse("telegram:7")
|
||||
|
||||
# Pre-seed dead via the first (failing) delivery.
|
||||
await router.deliver("a", [target])
|
||||
assert router.dead_targets.is_dead("telegram", "7") is True
|
||||
|
||||
# Manually clear to simulate the user re-adding the bot, then deliver again.
|
||||
router.dead_targets.clear("telegram", "7")
|
||||
res = await router.deliver("b", [target])
|
||||
assert res["telegram:7"]["success"] is True
|
||||
# Flag stays cleared after a successful send.
|
||||
assert router.dead_targets.is_dead("telegram", "7") is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_transient_failure_does_not_mark_dead(isolate):
|
||||
adapter = TransientFailAdapter()
|
||||
router = DeliveryRouter(GatewayConfig(), adapters={Platform.TELEGRAM: adapter})
|
||||
target = DeliveryTarget.parse("telegram:13")
|
||||
|
||||
res = await router.deliver("hi", [target])
|
||||
assert res["telegram:13"]["success"] is False
|
||||
# A timeout/transient error must NOT mark the chat dead — it may recover.
|
||||
assert router.dead_targets.is_dead("telegram", "13") is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_local_target_is_never_dead_tracked(isolate):
|
||||
router = DeliveryRouter(GatewayConfig(), adapters={})
|
||||
target = DeliveryTarget.parse("local")
|
||||
res = await router.deliver("hi", [target])
|
||||
assert res["local"]["success"] is True
|
||||
assert router.dead_targets.all_dead() == {}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_shared_registry_is_used_when_injected(isolate):
|
||||
shared = DeadTargetRegistry()
|
||||
shared.mark_dead("telegram", "500", "pre-existing")
|
||||
adapter = ForbiddenThenOkAdapter(fail_times=0)
|
||||
router = DeliveryRouter(
|
||||
GatewayConfig(),
|
||||
adapters={Platform.TELEGRAM: adapter},
|
||||
dead_targets=shared,
|
||||
)
|
||||
target = DeliveryTarget.parse("telegram:500")
|
||||
res = await router.deliver("hi", [target])
|
||||
# Injected registry's pre-existing flag short-circuits before any send.
|
||||
assert res["telegram:500"]["skipped"] == "dead_target"
|
||||
assert adapter.calls == []
|
||||
Loading…
Add table
Add a link
Reference in a new issue