diff --git a/gateway/drain_control.py b/gateway/drain_control.py index 6d5d96cd521..998de85092e 100644 --- a/gateway/drain_control.py +++ b/gateway/drain_control.py @@ -17,7 +17,7 @@ Contract (presence-based, mirroring ``.restart_notify.json``): * begin-drain → write ``{HERMES_HOME}/.drain_request.json`` with ``{"action": "drain", "requested_at": , "principal": , - "epoch": }``. + "epoch": , "suppress_notification": }``. * cancel-drain → remove the marker. * The gateway watcher treats **presence of a marker stamped with the current instantiation epoch** as "external drain active": flip @@ -133,7 +133,10 @@ def drain_request_path(home: Optional[Path] = None) -> Path: def write_drain_request( - *, principal: str = "drain-control", home: Optional[Path] = None + *, + principal: str = "drain-control", + suppress_notification: bool = False, + home: Optional[Path] = None, ) -> dict[str, Any]: """Write the begin-drain marker. Returns the payload written. @@ -144,12 +147,24 @@ def write_drain_request( Stamps the marker with :func:`current_instantiation_epoch` so a marker that later survives a machine restart on the durable HERMES_HOME volume can be recognised as stale and ignored (NS-570). + + ``suppress_notification`` is a generic "be quiet on the shutdown that ends + this drain" flag. When the drain culminates in a process exit (e.g. NAS + recreates the machine for an auto-update image migration), the gateway's + shutdown path reads it via :func:`drain_notification_suppressed` and skips + the *home-channel* "gateway shutting down" broadcast — the operator-flavoured + ping that would otherwise fire on every routine auto-update, potentially + dozens of times a day. It NEVER suppresses the per-active-session interrupt + ping. The gateway stays agnostic about *why* the drain is quiet; the policy + of which drain causes set the flag lives entirely in the caller (NAS). The + field defaults False so legacy/operator drains behave exactly as before. """ payload = { "action": "drain", "requested_at": datetime.now(timezone.utc).isoformat(), "principal": principal, "epoch": current_instantiation_epoch(), + "suppress_notification": bool(suppress_notification), } atomic_json_write(drain_request_path(home), payload) return payload @@ -211,6 +226,31 @@ def drain_requested(*, home: Optional[Path] = None) -> bool: return True +def drain_notification_suppressed(*, home: Optional[Path] = None) -> bool: + """True iff an ACTIVE drain marker asks to suppress the shutdown broadcast. + + "Active" means exactly what :func:`drain_requested` means — a marker present + AND stamped with the current instantiation epoch. A stale (other-epoch) + marker that survived a machine restart on the durable HERMES_HOME volume is + ignored here just as it is for drain state (NS-570): we must never let an + orphaned marker's flag silence a *fresh* gateway's legitimate shutdown + broadcast. + + Only honours the flag when it is explicitly truthy in the marker body. A + legacy marker without the field, a corrupt/contentless ``{}`` body, or an + absent marker all read as "not suppressed" (False) — fail toward the louder, + more-visible behaviour, consistent with :func:`read_drain_request`'s + never-raise contract. The gateway's shutdown path uses this to skip ONLY the + home-channel broadcast; the per-active-session interrupt ping is unaffected. + """ + body = read_drain_request(home=home) + if body is None: + return False + if _marker_epoch_is_stale(body): + return False + return bool(body.get("suppress_notification")) + + def read_drain_request(*, home: Optional[Path] = None) -> Optional[dict[str, Any]]: """Return the marker payload, or ``None`` if absent. diff --git a/gateway/run.py b/gateway/run.py index ceaefd9a830..1164717391b 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -5106,6 +5106,32 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew logger.debug("Skipping home-channel shutdown notifications for in-chat restart") return + # Suppress ONLY the home-channel broadcast when the drain that is ending + # in this shutdown asked us to be quiet (e.g. a NAS auto-update image + # migration — drain-gated, then the machine is recreated). On the + # always-on Hermes Cloud fleet that broadcast would otherwise fire on + # every routine auto-update, spamming home channels with operator- + # flavoured "gateway shutting down" pings the user doesn't care about. + # The per-active-session interrupt pings above are deliberately NOT + # gated: on a drained shutdown they're empty by construction, and in the + # force-interrupt (deadline-exceeded) case they carry the genuinely + # useful "your task was cut off, message me to resume" hint. The flag is + # only honoured for a CURRENT-epoch marker (drain_notification_suppressed + # reuses the NS-570 staleness check), so an orphaned marker can never + # silence a fresh gateway's legitimate broadcast. + try: + from gateway.drain_control import drain_notification_suppressed + if drain_notification_suppressed(): + logger.info( + "Home-channel shutdown broadcast suppressed by drain marker " + "(suppress_notification=true)" + ) + return + except Exception as e: + # Never let the suppression check block the shutdown broadcast — + # fail toward the louder, more-visible behaviour. + logger.debug("drain_notification_suppressed check failed: %s", e) + # Snapshot adapters up front: adapter.send() can hit a fatal error # path that pops the adapter from self.adapters (see _handle_fatal # elsewhere), which would otherwise trigger diff --git a/hermes_cli/web_server.py b/hermes_cli/web_server.py index d2580ab626c..09dacecb9f9 100644 --- a/hermes_cli/web_server.py +++ b/hermes_cli/web_server.py @@ -2868,8 +2868,15 @@ async def gateway_drain(request: Request): detail=f"Unknown drain action {action!r}; expected 'drain' or 'cancel'", ) - payload = write_drain_request(principal=str(principal)) - _log.info("Gateway drain BEGIN requested by %s", principal) + payload = write_drain_request( + principal=str(principal), + suppress_notification=bool((body or {}).get("suppress_notification", False)), + ) + _log.info( + "Gateway drain BEGIN requested by %s (suppress_notification=%s)", + principal, + payload["suppress_notification"], + ) return { "ok": True, "action": "drain", @@ -2877,6 +2884,7 @@ async def gateway_drain(request: Request): # Echo so a caller polling /api/status knows the marker is now set; # the gateway watcher flips gateway_state -> draining within ~1s. "draining": drain_requested(), + "suppress_notification": payload["suppress_notification"], } diff --git a/tests/gateway/test_external_drain_control.py b/tests/gateway/test_external_drain_control.py index 006167ab7d6..d37b4222435 100644 --- a/tests/gateway/test_external_drain_control.py +++ b/tests/gateway/test_external_drain_control.py @@ -72,6 +72,64 @@ class TestMarkerContract: assert data["action"] == "drain" +class TestSuppressNotification: + """The generic suppress_notification flag on the drain marker. + + Gates ONLY the gateway's home-channel shutdown broadcast (NAS auto-update + sets it true). Default-false so legacy/operator drains behave as before. + The reader reuses the NS-570 epoch-staleness check so an orphaned marker + can never silence a fresh gateway. + """ + + def test_default_false(self, home): + payload = dc.write_drain_request(principal="nas") + assert payload["suppress_notification"] is False + assert dc.drain_notification_suppressed() is False + + def test_flag_round_trips_true(self, home): + payload = dc.write_drain_request(principal="nas", suppress_notification=True) + assert payload["suppress_notification"] is True + body = dc.read_drain_request() + assert body is not None and body["suppress_notification"] is True + assert dc.drain_notification_suppressed() is True + + def test_suppressed_false_when_no_marker(self, home): + assert dc.drain_notification_suppressed() is False + + def test_legacy_marker_without_field_not_suppressed(self, home): + # A marker written before this change has no suppress_notification key → + # must read as not-suppressed (broadcast still fires), while still being + # an active drain. + import json + + dc.drain_request_path().write_text( + json.dumps({"action": "drain", "epoch": dc.current_instantiation_epoch()}), + encoding="utf-8", + ) + assert dc.drain_requested() is True + assert dc.drain_notification_suppressed() is False + + def test_corrupt_marker_not_suppressed(self, home): + # Half-written marker → read_drain_request returns {} → no flag → not + # suppressed (fail toward the louder, visible behaviour) even though the + # drain itself stays active (fail-safe toward quiescing). + dc.drain_request_path().write_text("{not valid json", encoding="utf-8") + assert dc.drain_requested() is True + assert dc.drain_notification_suppressed() is False + + def test_stale_epoch_marker_not_suppressed(self, home, monkeypatch): + # THE NS-570 ANALOGUE for suppression: a suppress_notification:true + # marker that survived a machine restart on the durable volume must NOT + # silence the freshly-restarted gateway's legitimate shutdown broadcast. + monkeypatch.setattr(dc, "current_instantiation_epoch", lambda: "epoch-OLD") + dc.write_drain_request(principal="nas", suppress_notification=True) + assert dc.drain_notification_suppressed() is True # same epoch → honoured + + monkeypatch.setattr(dc, "current_instantiation_epoch", lambda: "epoch-NEW") + assert dc.drain_request_path().exists() is True + assert dc.drain_notification_suppressed() is False # stale → ignored + + # --------------------------------------------------------------------------- # Instantiation-epoch staleness (NS-570: orphaned marker on durable volume) # --------------------------------------------------------------------------- diff --git a/tests/gateway/test_restart_drain.py b/tests/gateway/test_restart_drain.py index e9cae47ac9f..35135cfc448 100644 --- a/tests/gateway/test_restart_drain.py +++ b/tests/gateway/test_restart_drain.py @@ -536,4 +536,71 @@ async def test_shutdown_notification_uses_persisted_origin_for_colon_ids(): await runner._notify_active_sessions_of_shutdown() assert adapter.send.await_count == 1 - assert adapter.send.await_args.args[0] == "!room123:example.org" + + +@pytest.mark.asyncio +async def test_drain_suppress_skips_home_channel_keeps_session_ping(tmp_path, monkeypatch): + """A suppress_notification drain marker mutes ONLY the home-channel broadcast. + + The per-active-session interrupt ping MUST still fire (it carries the + "your task was interrupted, message me to resume" hint). This is the core + drain-notification-suppression contract. + """ + from gateway.config import HomeChannel, Platform + import gateway.drain_control as dc + + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + + runner, adapter = make_restart_runner() + # A home channel distinct from the active session's chat. + runner.config.platforms[Platform.TELEGRAM].home_channel = HomeChannel( + platform=Platform.TELEGRAM, + chat_id="home-42", + name="Ops Home", + ) + # One active session in a different chat. + runner._running_agents["agent:main:telegram:dm:999"] = MagicMock() + + # NAS auto-update drain: marker present with suppress_notification=True. + dc.write_drain_request(principal="nas", suppress_notification=True) + + await runner._notify_active_sessions_of_shutdown() + + # Exactly one send — the active-session ping to chat 999. The home-channel + # broadcast to home-42 was suppressed. + assert len(adapter.sent_calls) == 1 + sent_chat_ids = {chat_id for chat_id, _content, _meta in adapter.sent_calls} + assert "999" in sent_chat_ids + assert "home-42" not in sent_chat_ids + assert "shutting down" in adapter.sent[0] + + +@pytest.mark.asyncio +async def test_drain_without_suppress_flag_still_broadcasts_home_channel(tmp_path, monkeypatch): + """A drain marker WITHOUT the suppress flag leaves today's behaviour intact. + + Both the active-session ping AND the home-channel broadcast fire — proving + the suppression is opt-in and operator/legacy drains are unaffected. + """ + from gateway.config import HomeChannel, Platform + import gateway.drain_control as dc + + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + + runner, adapter = make_restart_runner() + runner.config.platforms[Platform.TELEGRAM].home_channel = HomeChannel( + platform=Platform.TELEGRAM, + chat_id="home-42", + name="Ops Home", + ) + runner._running_agents["agent:main:telegram:dm:999"] = MagicMock() + + # Operator drain: marker present, suppress_notification defaults False. + dc.write_drain_request(principal="dashboard") + + await runner._notify_active_sessions_of_shutdown() + + sent_chat_ids = {chat_id for chat_id, _content, _meta in adapter.sent_calls} + # Both targets notified (today's behaviour preserved). + assert "999" in sent_chat_ids + assert "home-42" in sent_chat_ids diff --git a/tests/hermes_cli/test_web_server.py b/tests/hermes_cli/test_web_server.py index d4b62eb76ce..1b2caf95291 100644 --- a/tests/hermes_cli/test_web_server.py +++ b/tests/hermes_cli/test_web_server.py @@ -270,6 +270,31 @@ class TestWebServerEndpoints: assert drain_control.drain_requested() is True drain_control.clear_drain_request() + def test_gateway_drain_suppress_notification_passthrough(self): + from gateway import drain_control + + resp = self.client.post( + "/api/gateway/drain", + json={"action": "drain", "suppress_notification": True}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["suppress_notification"] is True + # The flag landed on the marker the gateway reads at shutdown. + body = drain_control.read_drain_request() + assert body is not None and body["suppress_notification"] is True + assert drain_control.drain_notification_suppressed() is True + drain_control.clear_drain_request() + + def test_gateway_drain_suppress_defaults_false(self): + from gateway import drain_control + + resp = self.client.post("/api/gateway/drain", json={"action": "drain"}) + assert resp.status_code == 200 + assert resp.json()["suppress_notification"] is False + assert drain_control.drain_notification_suppressed() is False + drain_control.clear_drain_request() + def test_gateway_drain_cancel_removes_marker(self): from gateway import drain_control