mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(gateway): ignore redelivered /restart after PTB offset ACK fails (#11940)
When a Telegram /restart fires and PTB's graceful-shutdown `get_updates`
ACK call times out ("When polling for updates is restarted, updates may
be received twice" in gateway.log), the new gateway receives the same
/restart again and restarts a second time — a self-perpetuating loop.
Record the triggering update_id in `.restart_last_processed.json` when
handling /restart. On the next process, reject a /restart whose
update_id <= the recorded one as a stale redelivery. 5-minute staleness
guard so an orphaned marker can't block a legitimately new /restart.
- gateway/platforms/base.py: add `platform_update_id` to MessageEvent
- gateway/platforms/telegram.py: propagate `update.update_id` through
_build_message_event for text/command/location/media handlers
- gateway/run.py: write dedup marker in _handle_restart_command;
_is_stale_restart_redelivery checks it before processing /restart
- tests/gateway/test_restart_redelivery_dedup.py: 9 new tests covering
fresh restart, redelivery, staleness window, cross-platform,
malformed-marker resilience, and no-update_id (CLI) bypass
Only active for Telegram today (the one platform with monotonic
cross-session update ordering); other platforms return False from
_is_stale_restart_redelivery and proceed normally.
This commit is contained in:
parent
c5c0bb9a73
commit
45acd9beb5
4 changed files with 366 additions and 6 deletions
|
|
@ -670,6 +670,15 @@ class MessageEvent:
|
|||
raw_message: Any = None
|
||||
message_id: Optional[str] = None
|
||||
|
||||
# Platform-specific update identifier. For Telegram this is the
|
||||
# ``update_id`` from the PTB Update wrapper; other platforms currently
|
||||
# ignore it. Used by ``/restart`` to record the triggering update so the
|
||||
# new gateway can advance the Telegram offset past it and avoid processing
|
||||
# the same ``/restart`` twice if PTB's graceful-shutdown ACK times out
|
||||
# ("Error while calling `get_updates` one more time to mark all fetched
|
||||
# updates" in gateway.log).
|
||||
platform_update_id: Optional[int] = None
|
||||
|
||||
# Media attachments
|
||||
# media_urls: local file paths (for vision tool access)
|
||||
media_urls: List[str] = field(default_factory=list)
|
||||
|
|
|
|||
|
|
@ -2326,7 +2326,7 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
if not self._should_process_message(update.message):
|
||||
return
|
||||
|
||||
event = self._build_message_event(update.message, MessageType.TEXT)
|
||||
event = self._build_message_event(update.message, MessageType.TEXT, update_id=update.update_id)
|
||||
event.text = self._clean_bot_trigger_text(event.text)
|
||||
self._enqueue_text_event(event)
|
||||
|
||||
|
|
@ -2337,7 +2337,7 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
if not self._should_process_message(update.message, is_command=True):
|
||||
return
|
||||
|
||||
event = self._build_message_event(update.message, MessageType.COMMAND)
|
||||
event = self._build_message_event(update.message, MessageType.COMMAND, update_id=update.update_id)
|
||||
await self.handle_message(event)
|
||||
|
||||
async def _handle_location_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
|
|
@ -2373,7 +2373,7 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
parts.append(f"Map: https://www.google.com/maps/search/?api=1&query={lat},{lon}")
|
||||
parts.append("Ask what they'd like to find nearby (restaurants, cafes, etc.) and any preferences.")
|
||||
|
||||
event = self._build_message_event(msg, MessageType.LOCATION)
|
||||
event = self._build_message_event(msg, MessageType.LOCATION, update_id=update.update_id)
|
||||
event.text = "\n".join(parts)
|
||||
await self.handle_message(event)
|
||||
|
||||
|
|
@ -2524,7 +2524,7 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
else:
|
||||
msg_type = MessageType.DOCUMENT
|
||||
|
||||
event = self._build_message_event(msg, msg_type)
|
||||
event = self._build_message_event(msg, msg_type, update_id=update.update_id)
|
||||
|
||||
# Add caption as text
|
||||
if msg.caption:
|
||||
|
|
@ -2863,8 +2863,19 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
self.name, cache_key, thread_id,
|
||||
)
|
||||
|
||||
def _build_message_event(self, message: Message, msg_type: MessageType) -> MessageEvent:
|
||||
"""Build a MessageEvent from a Telegram message."""
|
||||
def _build_message_event(
|
||||
self,
|
||||
message: Message,
|
||||
msg_type: MessageType,
|
||||
update_id: Optional[int] = None,
|
||||
) -> MessageEvent:
|
||||
"""Build a MessageEvent from a Telegram message.
|
||||
|
||||
``update_id`` is the ``Update.update_id`` from PTB; passing it through
|
||||
lets ``/restart`` record the triggering offset so the new gateway
|
||||
process can advance past it (prevents ``/restart`` being re-delivered
|
||||
when PTB's graceful-shutdown ACK fails).
|
||||
"""
|
||||
chat = message.chat
|
||||
user = message.from_user
|
||||
|
||||
|
|
@ -2943,6 +2954,7 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
source=source,
|
||||
raw_message=message,
|
||||
message_id=str(message.message_id),
|
||||
platform_update_id=update_id,
|
||||
reply_to_message_id=reply_to_id,
|
||||
reply_to_text=reply_to_text,
|
||||
auto_skill=topic_skill,
|
||||
|
|
|
|||
|
|
@ -4738,6 +4738,26 @@ class GatewayRunner:
|
|||
|
||||
async def _handle_restart_command(self, event: MessageEvent) -> str:
|
||||
"""Handle /restart command - drain active work, then restart the gateway."""
|
||||
# Defensive idempotency check: if the previous gateway process
|
||||
# recorded this same /restart (same platform + update_id) and the new
|
||||
# process is seeing it *again*, this is a re-delivery caused by PTB's
|
||||
# graceful-shutdown `get_updates` ACK failing on the way out ("Error
|
||||
# while calling `get_updates` one more time to mark all fetched
|
||||
# updates. Suppressing error to ensure graceful shutdown. When
|
||||
# polling for updates is restarted, updates may be received twice."
|
||||
# in gateway.log). Ignoring the stale redelivery prevents a
|
||||
# self-perpetuating restart loop where every fresh gateway
|
||||
# re-processes the same /restart command and immediately restarts
|
||||
# again.
|
||||
if self._is_stale_restart_redelivery(event):
|
||||
logger.info(
|
||||
"Ignoring redelivered /restart (platform=%s, update_id=%s) — "
|
||||
"already processed by a previous gateway instance.",
|
||||
event.source.platform.value if event.source and event.source.platform else "?",
|
||||
event.platform_update_id,
|
||||
)
|
||||
return ""
|
||||
|
||||
if self._restart_requested or self._draining:
|
||||
count = self._running_agent_count()
|
||||
if count:
|
||||
|
|
@ -4760,6 +4780,26 @@ class GatewayRunner:
|
|||
except Exception as e:
|
||||
logger.debug("Failed to write restart notify file: %s", e)
|
||||
|
||||
# Record the triggering platform + update_id in a dedicated dedup
|
||||
# marker. Unlike .restart_notify.json (which gets unlinked once the
|
||||
# new gateway sends the "gateway restarted" notification), this
|
||||
# marker persists so the new gateway can still detect a delayed
|
||||
# /restart redelivery from Telegram. Overwritten on every /restart.
|
||||
try:
|
||||
import json as _json
|
||||
import time as _time
|
||||
dedup_data = {
|
||||
"platform": event.source.platform.value if event.source.platform else None,
|
||||
"requested_at": _time.time(),
|
||||
}
|
||||
if event.platform_update_id is not None:
|
||||
dedup_data["update_id"] = event.platform_update_id
|
||||
(_hermes_home / ".restart_last_processed.json").write_text(
|
||||
_json.dumps(dedup_data)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug("Failed to write restart dedup marker: %s", e)
|
||||
|
||||
active_agents = self._running_agent_count()
|
||||
# When running under a service manager (systemd/launchd), use the
|
||||
# service restart path: exit with code 75 so the service manager
|
||||
|
|
@ -4775,6 +4815,58 @@ class GatewayRunner:
|
|||
return f"⏳ Draining {active_agents} active agent(s) before restart..."
|
||||
return "♻ Restarting gateway. If you aren't notified within 60 seconds, restart from the console with `hermes gateway restart`."
|
||||
|
||||
def _is_stale_restart_redelivery(self, event: MessageEvent) -> bool:
|
||||
"""Return True if this /restart is a Telegram re-delivery we already handled.
|
||||
|
||||
The previous gateway wrote ``.restart_last_processed.json`` with the
|
||||
triggering platform + update_id when it processed the /restart. If
|
||||
we now see a /restart on the same platform with an update_id <= that
|
||||
recorded value AND the marker is recent (< 5 minutes), it's a
|
||||
redelivery and should be ignored.
|
||||
|
||||
Only applies to Telegram today (the only platform that exposes a
|
||||
numeric cross-session update ordering); other platforms return False.
|
||||
"""
|
||||
if event is None or event.source is None:
|
||||
return False
|
||||
if event.platform_update_id is None:
|
||||
return False
|
||||
if event.source.platform is None:
|
||||
return False
|
||||
# Only Telegram populates platform_update_id currently; be explicit
|
||||
# so future platforms aren't accidentally gated by this check.
|
||||
try:
|
||||
platform_value = event.source.platform.value
|
||||
except Exception:
|
||||
return False
|
||||
if platform_value != "telegram":
|
||||
return False
|
||||
|
||||
try:
|
||||
import json as _json
|
||||
import time as _time
|
||||
marker_path = _hermes_home / ".restart_last_processed.json"
|
||||
if not marker_path.exists():
|
||||
return False
|
||||
data = _json.loads(marker_path.read_text())
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
if data.get("platform") != platform_value:
|
||||
return False
|
||||
recorded_uid = data.get("update_id")
|
||||
if not isinstance(recorded_uid, int):
|
||||
return False
|
||||
# Staleness guard: ignore markers older than 5 minutes. A legitimately
|
||||
# old marker (e.g. crash recovery where notify never fired) should not
|
||||
# swallow a fresh /restart from the user.
|
||||
requested_at = data.get("requested_at")
|
||||
if isinstance(requested_at, (int, float)):
|
||||
if _time.time() - requested_at > 300:
|
||||
return False
|
||||
return event.platform_update_id <= recorded_uid
|
||||
|
||||
|
||||
async def _handle_help_command(self, event: MessageEvent) -> str:
|
||||
"""Handle /help command - list available commands."""
|
||||
from hermes_cli.commands import gateway_help_lines
|
||||
|
|
|
|||
247
tests/gateway/test_restart_redelivery_dedup.py
Normal file
247
tests/gateway/test_restart_redelivery_dedup.py
Normal file
|
|
@ -0,0 +1,247 @@
|
|||
"""Tests for /restart idempotency guard against Telegram update re-delivery.
|
||||
|
||||
When PTB's graceful-shutdown ACK call (the final `get_updates` on exit) fails
|
||||
with a network error, Telegram re-delivers the `/restart` message to the new
|
||||
gateway process. Without a dedup guard, the new gateway would process
|
||||
`/restart` again and immediately restart — a self-perpetuating loop.
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
import gateway.run as gateway_run
|
||||
from gateway.platforms.base import MessageEvent, MessageType
|
||||
from tests.gateway.restart_test_helpers import make_restart_runner, make_restart_source
|
||||
|
||||
|
||||
def _make_restart_event(update_id: int | None = 100) -> MessageEvent:
|
||||
return MessageEvent(
|
||||
text="/restart",
|
||||
message_type=MessageType.TEXT,
|
||||
source=make_restart_source(),
|
||||
message_id="m1",
|
||||
platform_update_id=update_id,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_restart_handler_writes_dedup_marker_with_update_id(tmp_path, monkeypatch):
|
||||
"""First /restart writes .restart_last_processed.json with the triggering update_id."""
|
||||
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
|
||||
monkeypatch.delenv("INVOCATION_ID", raising=False)
|
||||
|
||||
runner, _adapter = make_restart_runner()
|
||||
runner.request_restart = MagicMock(return_value=True)
|
||||
|
||||
event = _make_restart_event(update_id=12345)
|
||||
result = await runner._handle_restart_command(event)
|
||||
|
||||
assert "Restarting gateway" in result
|
||||
marker_path = tmp_path / ".restart_last_processed.json"
|
||||
assert marker_path.exists()
|
||||
data = json.loads(marker_path.read_text())
|
||||
assert data["platform"] == "telegram"
|
||||
assert data["update_id"] == 12345
|
||||
assert isinstance(data["requested_at"], (int, float))
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_redelivered_restart_with_same_update_id_is_ignored(tmp_path, monkeypatch):
|
||||
"""A /restart with update_id <= recorded marker is silently ignored as a redelivery."""
|
||||
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
|
||||
monkeypatch.delenv("INVOCATION_ID", raising=False)
|
||||
|
||||
# Previous gateway recorded update_id=12345 a few seconds ago
|
||||
marker = tmp_path / ".restart_last_processed.json"
|
||||
marker.write_text(json.dumps({
|
||||
"platform": "telegram",
|
||||
"update_id": 12345,
|
||||
"requested_at": time.time() - 5,
|
||||
}))
|
||||
|
||||
runner, _adapter = make_restart_runner()
|
||||
runner.request_restart = MagicMock()
|
||||
|
||||
event = _make_restart_event(update_id=12345) # same update_id → redelivery
|
||||
result = await runner._handle_restart_command(event)
|
||||
|
||||
assert result == "" # silently ignored
|
||||
runner.request_restart.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_redelivered_restart_with_older_update_id_is_ignored(tmp_path, monkeypatch):
|
||||
"""update_id strictly LESS than the recorded one is also a redelivery."""
|
||||
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
|
||||
monkeypatch.delenv("INVOCATION_ID", raising=False)
|
||||
|
||||
marker = tmp_path / ".restart_last_processed.json"
|
||||
marker.write_text(json.dumps({
|
||||
"platform": "telegram",
|
||||
"update_id": 12345,
|
||||
"requested_at": time.time() - 5,
|
||||
}))
|
||||
|
||||
runner, _adapter = make_restart_runner()
|
||||
runner.request_restart = MagicMock()
|
||||
|
||||
event = _make_restart_event(update_id=12344) # older update — shouldn't happen,
|
||||
# but if Telegram does re-deliver
|
||||
# something older, treat as stale
|
||||
result = await runner._handle_restart_command(event)
|
||||
|
||||
assert result == ""
|
||||
runner.request_restart.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fresh_restart_with_higher_update_id_is_processed(tmp_path, monkeypatch):
|
||||
"""A NEW /restart from the user (higher update_id) bypasses the dedup guard."""
|
||||
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
|
||||
monkeypatch.delenv("INVOCATION_ID", raising=False)
|
||||
|
||||
# Previous restart recorded update_id=12345
|
||||
marker = tmp_path / ".restart_last_processed.json"
|
||||
marker.write_text(json.dumps({
|
||||
"platform": "telegram",
|
||||
"update_id": 12345,
|
||||
"requested_at": time.time() - 5,
|
||||
}))
|
||||
|
||||
runner, _adapter = make_restart_runner()
|
||||
runner.request_restart = MagicMock(return_value=True)
|
||||
|
||||
event = _make_restart_event(update_id=12346) # strictly higher → fresh
|
||||
result = await runner._handle_restart_command(event)
|
||||
|
||||
assert "Restarting gateway" in result
|
||||
runner.request_restart.assert_called_once()
|
||||
|
||||
# Marker is overwritten with the new update_id
|
||||
data = json.loads(marker.read_text())
|
||||
assert data["update_id"] == 12346
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stale_marker_older_than_5min_does_not_block(tmp_path, monkeypatch):
|
||||
"""A marker older than the 5-minute window is ignored — fresh /restart proceeds."""
|
||||
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
|
||||
monkeypatch.delenv("INVOCATION_ID", raising=False)
|
||||
|
||||
marker = tmp_path / ".restart_last_processed.json"
|
||||
marker.write_text(json.dumps({
|
||||
"platform": "telegram",
|
||||
"update_id": 12345,
|
||||
"requested_at": time.time() - 600, # 10 minutes ago
|
||||
}))
|
||||
|
||||
runner, _adapter = make_restart_runner()
|
||||
runner.request_restart = MagicMock(return_value=True)
|
||||
|
||||
# Same update_id as the stale marker, but the marker is too old to trust
|
||||
event = _make_restart_event(update_id=12345)
|
||||
result = await runner._handle_restart_command(event)
|
||||
|
||||
assert "Restarting gateway" in result
|
||||
runner.request_restart.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_marker_file_allows_restart(tmp_path, monkeypatch):
|
||||
"""Clean gateway start (no prior marker) processes /restart normally."""
|
||||
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
|
||||
monkeypatch.delenv("INVOCATION_ID", raising=False)
|
||||
|
||||
runner, _adapter = make_restart_runner()
|
||||
runner.request_restart = MagicMock(return_value=True)
|
||||
|
||||
event = _make_restart_event(update_id=100)
|
||||
result = await runner._handle_restart_command(event)
|
||||
|
||||
assert "Restarting gateway" in result
|
||||
runner.request_restart.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_corrupt_marker_file_is_treated_as_absent(tmp_path, monkeypatch):
|
||||
"""Malformed JSON in the marker file doesn't crash — /restart proceeds."""
|
||||
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
|
||||
monkeypatch.delenv("INVOCATION_ID", raising=False)
|
||||
|
||||
marker = tmp_path / ".restart_last_processed.json"
|
||||
marker.write_text("not-json{")
|
||||
|
||||
runner, _adapter = make_restart_runner()
|
||||
runner.request_restart = MagicMock(return_value=True)
|
||||
|
||||
event = _make_restart_event(update_id=100)
|
||||
result = await runner._handle_restart_command(event)
|
||||
|
||||
assert "Restarting gateway" in result
|
||||
runner.request_restart.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_event_without_update_id_bypasses_dedup(tmp_path, monkeypatch):
|
||||
"""Events with no platform_update_id (non-Telegram, CLI fallback) aren't gated."""
|
||||
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
|
||||
monkeypatch.delenv("INVOCATION_ID", raising=False)
|
||||
|
||||
marker = tmp_path / ".restart_last_processed.json"
|
||||
marker.write_text(json.dumps({
|
||||
"platform": "telegram",
|
||||
"update_id": 999999,
|
||||
"requested_at": time.time(),
|
||||
}))
|
||||
|
||||
runner, _adapter = make_restart_runner()
|
||||
runner.request_restart = MagicMock(return_value=True)
|
||||
|
||||
# No update_id — the dedup check should NOT kick in
|
||||
event = _make_restart_event(update_id=None)
|
||||
result = await runner._handle_restart_command(event)
|
||||
|
||||
assert "Restarting gateway" in result
|
||||
runner.request_restart.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_different_platform_bypasses_dedup(tmp_path, monkeypatch):
|
||||
"""Marker from Telegram doesn't block a /restart from another platform."""
|
||||
from gateway.config import Platform
|
||||
from gateway.session import SessionSource
|
||||
|
||||
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
|
||||
monkeypatch.delenv("INVOCATION_ID", raising=False)
|
||||
|
||||
marker = tmp_path / ".restart_last_processed.json"
|
||||
marker.write_text(json.dumps({
|
||||
"platform": "telegram",
|
||||
"update_id": 12345,
|
||||
"requested_at": time.time(),
|
||||
}))
|
||||
|
||||
runner, _adapter = make_restart_runner()
|
||||
runner.request_restart = MagicMock(return_value=True)
|
||||
|
||||
# /restart from Discord — not a redelivery candidate
|
||||
discord_source = SessionSource(
|
||||
platform=Platform.DISCORD,
|
||||
chat_id="discord-chan",
|
||||
chat_type="dm",
|
||||
user_id="u1",
|
||||
)
|
||||
event = MessageEvent(
|
||||
text="/restart",
|
||||
message_type=MessageType.TEXT,
|
||||
source=discord_source,
|
||||
message_id="m1",
|
||||
platform_update_id=12345,
|
||||
)
|
||||
result = await runner._handle_restart_command(event)
|
||||
|
||||
assert "Restarting gateway" in result
|
||||
runner.request_restart.assert_called_once()
|
||||
Loading…
Add table
Add a link
Reference in a new issue