diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index af694a5e2..f82b1fa06 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -669,6 +669,15 @@ class MessageEvent: # Original platform data raw_message: Any = None message_id: Optional[str] = None + + # Platform-specific update identifier. For Telegram this is the + # ``update_id`` from the PTB Update wrapper; other platforms currently + # ignore it. Used by ``/restart`` to record the triggering update so the + # new gateway can advance the Telegram offset past it and avoid processing + # the same ``/restart`` twice if PTB's graceful-shutdown ACK times out + # ("Error while calling `get_updates` one more time to mark all fetched + # updates" in gateway.log). + platform_update_id: Optional[int] = None # Media attachments # media_urls: local file paths (for vision tool access) diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 5b1fef133..8df05268c 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -2326,7 +2326,7 @@ class TelegramAdapter(BasePlatformAdapter): if not self._should_process_message(update.message): return - event = self._build_message_event(update.message, MessageType.TEXT) + event = self._build_message_event(update.message, MessageType.TEXT, update_id=update.update_id) event.text = self._clean_bot_trigger_text(event.text) self._enqueue_text_event(event) @@ -2337,7 +2337,7 @@ class TelegramAdapter(BasePlatformAdapter): if not self._should_process_message(update.message, is_command=True): return - event = self._build_message_event(update.message, MessageType.COMMAND) + event = self._build_message_event(update.message, MessageType.COMMAND, update_id=update.update_id) await self.handle_message(event) async def _handle_location_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: @@ -2373,7 +2373,7 @@ class TelegramAdapter(BasePlatformAdapter): parts.append(f"Map: https://www.google.com/maps/search/?api=1&query={lat},{lon}") parts.append("Ask what they'd like to find nearby (restaurants, cafes, etc.) and any preferences.") - event = self._build_message_event(msg, MessageType.LOCATION) + event = self._build_message_event(msg, MessageType.LOCATION, update_id=update.update_id) event.text = "\n".join(parts) await self.handle_message(event) @@ -2524,7 +2524,7 @@ class TelegramAdapter(BasePlatformAdapter): else: msg_type = MessageType.DOCUMENT - event = self._build_message_event(msg, msg_type) + event = self._build_message_event(msg, msg_type, update_id=update.update_id) # Add caption as text if msg.caption: @@ -2863,8 +2863,19 @@ class TelegramAdapter(BasePlatformAdapter): self.name, cache_key, thread_id, ) - def _build_message_event(self, message: Message, msg_type: MessageType) -> MessageEvent: - """Build a MessageEvent from a Telegram message.""" + def _build_message_event( + self, + message: Message, + msg_type: MessageType, + update_id: Optional[int] = None, + ) -> MessageEvent: + """Build a MessageEvent from a Telegram message. + + ``update_id`` is the ``Update.update_id`` from PTB; passing it through + lets ``/restart`` record the triggering offset so the new gateway + process can advance past it (prevents ``/restart`` being re-delivered + when PTB's graceful-shutdown ACK fails). + """ chat = message.chat user = message.from_user @@ -2943,6 +2954,7 @@ class TelegramAdapter(BasePlatformAdapter): source=source, raw_message=message, message_id=str(message.message_id), + platform_update_id=update_id, reply_to_message_id=reply_to_id, reply_to_text=reply_to_text, auto_skill=topic_skill, diff --git a/gateway/run.py b/gateway/run.py index e09dbde26..62b813f0d 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -4738,6 +4738,26 @@ class GatewayRunner: async def _handle_restart_command(self, event: MessageEvent) -> str: """Handle /restart command - drain active work, then restart the gateway.""" + # Defensive idempotency check: if the previous gateway process + # recorded this same /restart (same platform + update_id) and the new + # process is seeing it *again*, this is a re-delivery caused by PTB's + # graceful-shutdown `get_updates` ACK failing on the way out ("Error + # while calling `get_updates` one more time to mark all fetched + # updates. Suppressing error to ensure graceful shutdown. When + # polling for updates is restarted, updates may be received twice." + # in gateway.log). Ignoring the stale redelivery prevents a + # self-perpetuating restart loop where every fresh gateway + # re-processes the same /restart command and immediately restarts + # again. + if self._is_stale_restart_redelivery(event): + logger.info( + "Ignoring redelivered /restart (platform=%s, update_id=%s) — " + "already processed by a previous gateway instance.", + event.source.platform.value if event.source and event.source.platform else "?", + event.platform_update_id, + ) + return "" + if self._restart_requested or self._draining: count = self._running_agent_count() if count: @@ -4760,6 +4780,26 @@ class GatewayRunner: except Exception as e: logger.debug("Failed to write restart notify file: %s", e) + # Record the triggering platform + update_id in a dedicated dedup + # marker. Unlike .restart_notify.json (which gets unlinked once the + # new gateway sends the "gateway restarted" notification), this + # marker persists so the new gateway can still detect a delayed + # /restart redelivery from Telegram. Overwritten on every /restart. + try: + import json as _json + import time as _time + dedup_data = { + "platform": event.source.platform.value if event.source.platform else None, + "requested_at": _time.time(), + } + if event.platform_update_id is not None: + dedup_data["update_id"] = event.platform_update_id + (_hermes_home / ".restart_last_processed.json").write_text( + _json.dumps(dedup_data) + ) + except Exception as e: + logger.debug("Failed to write restart dedup marker: %s", e) + active_agents = self._running_agent_count() # When running under a service manager (systemd/launchd), use the # service restart path: exit with code 75 so the service manager @@ -4775,6 +4815,58 @@ class GatewayRunner: return f"⏳ Draining {active_agents} active agent(s) before restart..." return "♻ Restarting gateway. If you aren't notified within 60 seconds, restart from the console with `hermes gateway restart`." + def _is_stale_restart_redelivery(self, event: MessageEvent) -> bool: + """Return True if this /restart is a Telegram re-delivery we already handled. + + The previous gateway wrote ``.restart_last_processed.json`` with the + triggering platform + update_id when it processed the /restart. If + we now see a /restart on the same platform with an update_id <= that + recorded value AND the marker is recent (< 5 minutes), it's a + redelivery and should be ignored. + + Only applies to Telegram today (the only platform that exposes a + numeric cross-session update ordering); other platforms return False. + """ + if event is None or event.source is None: + return False + if event.platform_update_id is None: + return False + if event.source.platform is None: + return False + # Only Telegram populates platform_update_id currently; be explicit + # so future platforms aren't accidentally gated by this check. + try: + platform_value = event.source.platform.value + except Exception: + return False + if platform_value != "telegram": + return False + + try: + import json as _json + import time as _time + marker_path = _hermes_home / ".restart_last_processed.json" + if not marker_path.exists(): + return False + data = _json.loads(marker_path.read_text()) + except Exception: + return False + + if data.get("platform") != platform_value: + return False + recorded_uid = data.get("update_id") + if not isinstance(recorded_uid, int): + return False + # Staleness guard: ignore markers older than 5 minutes. A legitimately + # old marker (e.g. crash recovery where notify never fired) should not + # swallow a fresh /restart from the user. + requested_at = data.get("requested_at") + if isinstance(requested_at, (int, float)): + if _time.time() - requested_at > 300: + return False + return event.platform_update_id <= recorded_uid + + async def _handle_help_command(self, event: MessageEvent) -> str: """Handle /help command - list available commands.""" from hermes_cli.commands import gateway_help_lines diff --git a/tests/gateway/test_restart_redelivery_dedup.py b/tests/gateway/test_restart_redelivery_dedup.py new file mode 100644 index 000000000..aa4e4330c --- /dev/null +++ b/tests/gateway/test_restart_redelivery_dedup.py @@ -0,0 +1,247 @@ +"""Tests for /restart idempotency guard against Telegram update re-delivery. + +When PTB's graceful-shutdown ACK call (the final `get_updates` on exit) fails +with a network error, Telegram re-delivers the `/restart` message to the new +gateway process. Without a dedup guard, the new gateway would process +`/restart` again and immediately restart — a self-perpetuating loop. +""" +import asyncio +import json +import time +from unittest.mock import MagicMock + +import pytest + +import gateway.run as gateway_run +from gateway.platforms.base import MessageEvent, MessageType +from tests.gateway.restart_test_helpers import make_restart_runner, make_restart_source + + +def _make_restart_event(update_id: int | None = 100) -> MessageEvent: + return MessageEvent( + text="/restart", + message_type=MessageType.TEXT, + source=make_restart_source(), + message_id="m1", + platform_update_id=update_id, + ) + + +@pytest.mark.asyncio +async def test_restart_handler_writes_dedup_marker_with_update_id(tmp_path, monkeypatch): + """First /restart writes .restart_last_processed.json with the triggering update_id.""" + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + monkeypatch.delenv("INVOCATION_ID", raising=False) + + runner, _adapter = make_restart_runner() + runner.request_restart = MagicMock(return_value=True) + + event = _make_restart_event(update_id=12345) + result = await runner._handle_restart_command(event) + + assert "Restarting gateway" in result + marker_path = tmp_path / ".restart_last_processed.json" + assert marker_path.exists() + data = json.loads(marker_path.read_text()) + assert data["platform"] == "telegram" + assert data["update_id"] == 12345 + assert isinstance(data["requested_at"], (int, float)) + + +@pytest.mark.asyncio +async def test_redelivered_restart_with_same_update_id_is_ignored(tmp_path, monkeypatch): + """A /restart with update_id <= recorded marker is silently ignored as a redelivery.""" + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + monkeypatch.delenv("INVOCATION_ID", raising=False) + + # Previous gateway recorded update_id=12345 a few seconds ago + marker = tmp_path / ".restart_last_processed.json" + marker.write_text(json.dumps({ + "platform": "telegram", + "update_id": 12345, + "requested_at": time.time() - 5, + })) + + runner, _adapter = make_restart_runner() + runner.request_restart = MagicMock() + + event = _make_restart_event(update_id=12345) # same update_id → redelivery + result = await runner._handle_restart_command(event) + + assert result == "" # silently ignored + runner.request_restart.assert_not_called() + + +@pytest.mark.asyncio +async def test_redelivered_restart_with_older_update_id_is_ignored(tmp_path, monkeypatch): + """update_id strictly LESS than the recorded one is also a redelivery.""" + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + monkeypatch.delenv("INVOCATION_ID", raising=False) + + marker = tmp_path / ".restart_last_processed.json" + marker.write_text(json.dumps({ + "platform": "telegram", + "update_id": 12345, + "requested_at": time.time() - 5, + })) + + runner, _adapter = make_restart_runner() + runner.request_restart = MagicMock() + + event = _make_restart_event(update_id=12344) # older update — shouldn't happen, + # but if Telegram does re-deliver + # something older, treat as stale + result = await runner._handle_restart_command(event) + + assert result == "" + runner.request_restart.assert_not_called() + + +@pytest.mark.asyncio +async def test_fresh_restart_with_higher_update_id_is_processed(tmp_path, monkeypatch): + """A NEW /restart from the user (higher update_id) bypasses the dedup guard.""" + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + monkeypatch.delenv("INVOCATION_ID", raising=False) + + # Previous restart recorded update_id=12345 + marker = tmp_path / ".restart_last_processed.json" + marker.write_text(json.dumps({ + "platform": "telegram", + "update_id": 12345, + "requested_at": time.time() - 5, + })) + + runner, _adapter = make_restart_runner() + runner.request_restart = MagicMock(return_value=True) + + event = _make_restart_event(update_id=12346) # strictly higher → fresh + result = await runner._handle_restart_command(event) + + assert "Restarting gateway" in result + runner.request_restart.assert_called_once() + + # Marker is overwritten with the new update_id + data = json.loads(marker.read_text()) + assert data["update_id"] == 12346 + + +@pytest.mark.asyncio +async def test_stale_marker_older_than_5min_does_not_block(tmp_path, monkeypatch): + """A marker older than the 5-minute window is ignored — fresh /restart proceeds.""" + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + monkeypatch.delenv("INVOCATION_ID", raising=False) + + marker = tmp_path / ".restart_last_processed.json" + marker.write_text(json.dumps({ + "platform": "telegram", + "update_id": 12345, + "requested_at": time.time() - 600, # 10 minutes ago + })) + + runner, _adapter = make_restart_runner() + runner.request_restart = MagicMock(return_value=True) + + # Same update_id as the stale marker, but the marker is too old to trust + event = _make_restart_event(update_id=12345) + result = await runner._handle_restart_command(event) + + assert "Restarting gateway" in result + runner.request_restart.assert_called_once() + + +@pytest.mark.asyncio +async def test_no_marker_file_allows_restart(tmp_path, monkeypatch): + """Clean gateway start (no prior marker) processes /restart normally.""" + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + monkeypatch.delenv("INVOCATION_ID", raising=False) + + runner, _adapter = make_restart_runner() + runner.request_restart = MagicMock(return_value=True) + + event = _make_restart_event(update_id=100) + result = await runner._handle_restart_command(event) + + assert "Restarting gateway" in result + runner.request_restart.assert_called_once() + + +@pytest.mark.asyncio +async def test_corrupt_marker_file_is_treated_as_absent(tmp_path, monkeypatch): + """Malformed JSON in the marker file doesn't crash — /restart proceeds.""" + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + monkeypatch.delenv("INVOCATION_ID", raising=False) + + marker = tmp_path / ".restart_last_processed.json" + marker.write_text("not-json{") + + runner, _adapter = make_restart_runner() + runner.request_restart = MagicMock(return_value=True) + + event = _make_restart_event(update_id=100) + result = await runner._handle_restart_command(event) + + assert "Restarting gateway" in result + runner.request_restart.assert_called_once() + + +@pytest.mark.asyncio +async def test_event_without_update_id_bypasses_dedup(tmp_path, monkeypatch): + """Events with no platform_update_id (non-Telegram, CLI fallback) aren't gated.""" + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + monkeypatch.delenv("INVOCATION_ID", raising=False) + + marker = tmp_path / ".restart_last_processed.json" + marker.write_text(json.dumps({ + "platform": "telegram", + "update_id": 999999, + "requested_at": time.time(), + })) + + runner, _adapter = make_restart_runner() + runner.request_restart = MagicMock(return_value=True) + + # No update_id — the dedup check should NOT kick in + event = _make_restart_event(update_id=None) + result = await runner._handle_restart_command(event) + + assert "Restarting gateway" in result + runner.request_restart.assert_called_once() + + +@pytest.mark.asyncio +async def test_different_platform_bypasses_dedup(tmp_path, monkeypatch): + """Marker from Telegram doesn't block a /restart from another platform.""" + from gateway.config import Platform + from gateway.session import SessionSource + + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + monkeypatch.delenv("INVOCATION_ID", raising=False) + + marker = tmp_path / ".restart_last_processed.json" + marker.write_text(json.dumps({ + "platform": "telegram", + "update_id": 12345, + "requested_at": time.time(), + })) + + runner, _adapter = make_restart_runner() + runner.request_restart = MagicMock(return_value=True) + + # /restart from Discord — not a redelivery candidate + discord_source = SessionSource( + platform=Platform.DISCORD, + chat_id="discord-chan", + chat_type="dm", + user_id="u1", + ) + event = MessageEvent( + text="/restart", + message_type=MessageType.TEXT, + source=discord_source, + message_id="m1", + platform_update_id=12345, + ) + result = await runner._handle_restart_command(event) + + assert "Restarting gateway" in result + runner.request_restart.assert_called_once()