mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-30 11:52:04 +00:00
feat(gateway): suppress home-channel shutdown broadcast on flagged drains (#54824)
Add a generic suppress_notification flag to the drain-request marker. When a drain that ends in process exit (e.g. a NAS auto-update image migration on the always-on Hermes Cloud fleet) is flagged, the gateway skips ONLY the home-channel 'gateway shutting down' broadcast — the operator-flavoured ping that would otherwise fire on every routine auto-update, dozens of times a day. The per-active-session interrupt ping is ALWAYS kept: on a drained shutdown it's empty by construction, and in the force-interrupt (deadline-exceeded) case it carries the user-valuable 'your task was cut off, message me to resume' hint. The gateway stays agnostic about WHY a drain is quiet (generic boolean, not a kind enum); the policy of which drain causes set the flag lives in the caller (NAS). Default-false so legacy/operator drains behave exactly as before. The reader reuses the NS-570 epoch-staleness check so an orphaned marker on the durable volume can never silence a fresh gateway's legitimate broadcast. - drain_control.py: write_drain_request gains suppress_notification; new drain_notification_suppressed() reader (current-epoch + truthy flag). - web_server.py: /api/gateway/drain reads + echoes the flag. - run.py: _notify_active_sessions_of_shutdown skips the home-channel loop only. Tests prove: flag round-trips; home-channel suppressed when set, kept when unset; active-session ping always fires; stale/legacy/corrupt markers never suppress.
This commit is contained in:
parent
ccc92c5213
commit
b963d3238b
6 changed files with 229 additions and 5 deletions
|
|
@ -17,7 +17,7 @@ Contract (presence-based, mirroring ``.restart_notify.json``):
|
|||
|
||||
* begin-drain → write ``{HERMES_HOME}/.drain_request.json`` with
|
||||
``{"action": "drain", "requested_at": <iso>, "principal": <str>,
|
||||
"epoch": <instantiation-epoch>}``.
|
||||
"epoch": <instantiation-epoch>, "suppress_notification": <bool>}``.
|
||||
* cancel-drain → remove the marker.
|
||||
* The gateway watcher treats **presence of a marker stamped with the current
|
||||
instantiation epoch** as "external drain active": flip
|
||||
|
|
@ -133,7 +133,10 @@ def drain_request_path(home: Optional[Path] = None) -> Path:
|
|||
|
||||
|
||||
def write_drain_request(
|
||||
*, principal: str = "drain-control", home: Optional[Path] = None
|
||||
*,
|
||||
principal: str = "drain-control",
|
||||
suppress_notification: bool = False,
|
||||
home: Optional[Path] = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Write the begin-drain marker. Returns the payload written.
|
||||
|
||||
|
|
@ -144,12 +147,24 @@ def write_drain_request(
|
|||
Stamps the marker with :func:`current_instantiation_epoch` so a marker that
|
||||
later survives a machine restart on the durable HERMES_HOME volume can be
|
||||
recognised as stale and ignored (NS-570).
|
||||
|
||||
``suppress_notification`` is a generic "be quiet on the shutdown that ends
|
||||
this drain" flag. When the drain culminates in a process exit (e.g. NAS
|
||||
recreates the machine for an auto-update image migration), the gateway's
|
||||
shutdown path reads it via :func:`drain_notification_suppressed` and skips
|
||||
the *home-channel* "gateway shutting down" broadcast — the operator-flavoured
|
||||
ping that would otherwise fire on every routine auto-update, potentially
|
||||
dozens of times a day. It NEVER suppresses the per-active-session interrupt
|
||||
ping. The gateway stays agnostic about *why* the drain is quiet; the policy
|
||||
of which drain causes set the flag lives entirely in the caller (NAS). The
|
||||
field defaults False so legacy/operator drains behave exactly as before.
|
||||
"""
|
||||
payload = {
|
||||
"action": "drain",
|
||||
"requested_at": datetime.now(timezone.utc).isoformat(),
|
||||
"principal": principal,
|
||||
"epoch": current_instantiation_epoch(),
|
||||
"suppress_notification": bool(suppress_notification),
|
||||
}
|
||||
atomic_json_write(drain_request_path(home), payload)
|
||||
return payload
|
||||
|
|
@ -211,6 +226,31 @@ def drain_requested(*, home: Optional[Path] = None) -> bool:
|
|||
return True
|
||||
|
||||
|
||||
def drain_notification_suppressed(*, home: Optional[Path] = None) -> bool:
|
||||
"""True iff an ACTIVE drain marker asks to suppress the shutdown broadcast.
|
||||
|
||||
"Active" means exactly what :func:`drain_requested` means — a marker present
|
||||
AND stamped with the current instantiation epoch. A stale (other-epoch)
|
||||
marker that survived a machine restart on the durable HERMES_HOME volume is
|
||||
ignored here just as it is for drain state (NS-570): we must never let an
|
||||
orphaned marker's flag silence a *fresh* gateway's legitimate shutdown
|
||||
broadcast.
|
||||
|
||||
Only honours the flag when it is explicitly truthy in the marker body. A
|
||||
legacy marker without the field, a corrupt/contentless ``{}`` body, or an
|
||||
absent marker all read as "not suppressed" (False) — fail toward the louder,
|
||||
more-visible behaviour, consistent with :func:`read_drain_request`'s
|
||||
never-raise contract. The gateway's shutdown path uses this to skip ONLY the
|
||||
home-channel broadcast; the per-active-session interrupt ping is unaffected.
|
||||
"""
|
||||
body = read_drain_request(home=home)
|
||||
if body is None:
|
||||
return False
|
||||
if _marker_epoch_is_stale(body):
|
||||
return False
|
||||
return bool(body.get("suppress_notification"))
|
||||
|
||||
|
||||
def read_drain_request(*, home: Optional[Path] = None) -> Optional[dict[str, Any]]:
|
||||
"""Return the marker payload, or ``None`` if absent.
|
||||
|
||||
|
|
|
|||
|
|
@ -5106,6 +5106,32 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
logger.debug("Skipping home-channel shutdown notifications for in-chat restart")
|
||||
return
|
||||
|
||||
# Suppress ONLY the home-channel broadcast when the drain that is ending
|
||||
# in this shutdown asked us to be quiet (e.g. a NAS auto-update image
|
||||
# migration — drain-gated, then the machine is recreated). On the
|
||||
# always-on Hermes Cloud fleet that broadcast would otherwise fire on
|
||||
# every routine auto-update, spamming home channels with operator-
|
||||
# flavoured "gateway shutting down" pings the user doesn't care about.
|
||||
# The per-active-session interrupt pings above are deliberately NOT
|
||||
# gated: on a drained shutdown they're empty by construction, and in the
|
||||
# force-interrupt (deadline-exceeded) case they carry the genuinely
|
||||
# useful "your task was cut off, message me to resume" hint. The flag is
|
||||
# only honoured for a CURRENT-epoch marker (drain_notification_suppressed
|
||||
# reuses the NS-570 staleness check), so an orphaned marker can never
|
||||
# silence a fresh gateway's legitimate broadcast.
|
||||
try:
|
||||
from gateway.drain_control import drain_notification_suppressed
|
||||
if drain_notification_suppressed():
|
||||
logger.info(
|
||||
"Home-channel shutdown broadcast suppressed by drain marker "
|
||||
"(suppress_notification=true)"
|
||||
)
|
||||
return
|
||||
except Exception as e:
|
||||
# Never let the suppression check block the shutdown broadcast —
|
||||
# fail toward the louder, more-visible behaviour.
|
||||
logger.debug("drain_notification_suppressed check failed: %s", e)
|
||||
|
||||
# Snapshot adapters up front: adapter.send() can hit a fatal error
|
||||
# path that pops the adapter from self.adapters (see _handle_fatal
|
||||
# elsewhere), which would otherwise trigger
|
||||
|
|
|
|||
|
|
@ -2868,8 +2868,15 @@ async def gateway_drain(request: Request):
|
|||
detail=f"Unknown drain action {action!r}; expected 'drain' or 'cancel'",
|
||||
)
|
||||
|
||||
payload = write_drain_request(principal=str(principal))
|
||||
_log.info("Gateway drain BEGIN requested by %s", principal)
|
||||
payload = write_drain_request(
|
||||
principal=str(principal),
|
||||
suppress_notification=bool((body or {}).get("suppress_notification", False)),
|
||||
)
|
||||
_log.info(
|
||||
"Gateway drain BEGIN requested by %s (suppress_notification=%s)",
|
||||
principal,
|
||||
payload["suppress_notification"],
|
||||
)
|
||||
return {
|
||||
"ok": True,
|
||||
"action": "drain",
|
||||
|
|
@ -2877,6 +2884,7 @@ async def gateway_drain(request: Request):
|
|||
# Echo so a caller polling /api/status knows the marker is now set;
|
||||
# the gateway watcher flips gateway_state -> draining within ~1s.
|
||||
"draining": drain_requested(),
|
||||
"suppress_notification": payload["suppress_notification"],
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -72,6 +72,64 @@ class TestMarkerContract:
|
|||
assert data["action"] == "drain"
|
||||
|
||||
|
||||
class TestSuppressNotification:
|
||||
"""The generic suppress_notification flag on the drain marker.
|
||||
|
||||
Gates ONLY the gateway's home-channel shutdown broadcast (NAS auto-update
|
||||
sets it true). Default-false so legacy/operator drains behave as before.
|
||||
The reader reuses the NS-570 epoch-staleness check so an orphaned marker
|
||||
can never silence a fresh gateway.
|
||||
"""
|
||||
|
||||
def test_default_false(self, home):
|
||||
payload = dc.write_drain_request(principal="nas")
|
||||
assert payload["suppress_notification"] is False
|
||||
assert dc.drain_notification_suppressed() is False
|
||||
|
||||
def test_flag_round_trips_true(self, home):
|
||||
payload = dc.write_drain_request(principal="nas", suppress_notification=True)
|
||||
assert payload["suppress_notification"] is True
|
||||
body = dc.read_drain_request()
|
||||
assert body is not None and body["suppress_notification"] is True
|
||||
assert dc.drain_notification_suppressed() is True
|
||||
|
||||
def test_suppressed_false_when_no_marker(self, home):
|
||||
assert dc.drain_notification_suppressed() is False
|
||||
|
||||
def test_legacy_marker_without_field_not_suppressed(self, home):
|
||||
# A marker written before this change has no suppress_notification key →
|
||||
# must read as not-suppressed (broadcast still fires), while still being
|
||||
# an active drain.
|
||||
import json
|
||||
|
||||
dc.drain_request_path().write_text(
|
||||
json.dumps({"action": "drain", "epoch": dc.current_instantiation_epoch()}),
|
||||
encoding="utf-8",
|
||||
)
|
||||
assert dc.drain_requested() is True
|
||||
assert dc.drain_notification_suppressed() is False
|
||||
|
||||
def test_corrupt_marker_not_suppressed(self, home):
|
||||
# Half-written marker → read_drain_request returns {} → no flag → not
|
||||
# suppressed (fail toward the louder, visible behaviour) even though the
|
||||
# drain itself stays active (fail-safe toward quiescing).
|
||||
dc.drain_request_path().write_text("{not valid json", encoding="utf-8")
|
||||
assert dc.drain_requested() is True
|
||||
assert dc.drain_notification_suppressed() is False
|
||||
|
||||
def test_stale_epoch_marker_not_suppressed(self, home, monkeypatch):
|
||||
# THE NS-570 ANALOGUE for suppression: a suppress_notification:true
|
||||
# marker that survived a machine restart on the durable volume must NOT
|
||||
# silence the freshly-restarted gateway's legitimate shutdown broadcast.
|
||||
monkeypatch.setattr(dc, "current_instantiation_epoch", lambda: "epoch-OLD")
|
||||
dc.write_drain_request(principal="nas", suppress_notification=True)
|
||||
assert dc.drain_notification_suppressed() is True # same epoch → honoured
|
||||
|
||||
monkeypatch.setattr(dc, "current_instantiation_epoch", lambda: "epoch-NEW")
|
||||
assert dc.drain_request_path().exists() is True
|
||||
assert dc.drain_notification_suppressed() is False # stale → ignored
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Instantiation-epoch staleness (NS-570: orphaned marker on durable volume)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -536,4 +536,71 @@ async def test_shutdown_notification_uses_persisted_origin_for_colon_ids():
|
|||
await runner._notify_active_sessions_of_shutdown()
|
||||
|
||||
assert adapter.send.await_count == 1
|
||||
assert adapter.send.await_args.args[0] == "!room123:example.org"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_drain_suppress_skips_home_channel_keeps_session_ping(tmp_path, monkeypatch):
|
||||
"""A suppress_notification drain marker mutes ONLY the home-channel broadcast.
|
||||
|
||||
The per-active-session interrupt ping MUST still fire (it carries the
|
||||
"your task was interrupted, message me to resume" hint). This is the core
|
||||
drain-notification-suppression contract.
|
||||
"""
|
||||
from gateway.config import HomeChannel, Platform
|
||||
import gateway.drain_control as dc
|
||||
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
|
||||
runner, adapter = make_restart_runner()
|
||||
# A home channel distinct from the active session's chat.
|
||||
runner.config.platforms[Platform.TELEGRAM].home_channel = HomeChannel(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id="home-42",
|
||||
name="Ops Home",
|
||||
)
|
||||
# One active session in a different chat.
|
||||
runner._running_agents["agent:main:telegram:dm:999"] = MagicMock()
|
||||
|
||||
# NAS auto-update drain: marker present with suppress_notification=True.
|
||||
dc.write_drain_request(principal="nas", suppress_notification=True)
|
||||
|
||||
await runner._notify_active_sessions_of_shutdown()
|
||||
|
||||
# Exactly one send — the active-session ping to chat 999. The home-channel
|
||||
# broadcast to home-42 was suppressed.
|
||||
assert len(adapter.sent_calls) == 1
|
||||
sent_chat_ids = {chat_id for chat_id, _content, _meta in adapter.sent_calls}
|
||||
assert "999" in sent_chat_ids
|
||||
assert "home-42" not in sent_chat_ids
|
||||
assert "shutting down" in adapter.sent[0]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_drain_without_suppress_flag_still_broadcasts_home_channel(tmp_path, monkeypatch):
|
||||
"""A drain marker WITHOUT the suppress flag leaves today's behaviour intact.
|
||||
|
||||
Both the active-session ping AND the home-channel broadcast fire — proving
|
||||
the suppression is opt-in and operator/legacy drains are unaffected.
|
||||
"""
|
||||
from gateway.config import HomeChannel, Platform
|
||||
import gateway.drain_control as dc
|
||||
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
|
||||
runner, adapter = make_restart_runner()
|
||||
runner.config.platforms[Platform.TELEGRAM].home_channel = HomeChannel(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id="home-42",
|
||||
name="Ops Home",
|
||||
)
|
||||
runner._running_agents["agent:main:telegram:dm:999"] = MagicMock()
|
||||
|
||||
# Operator drain: marker present, suppress_notification defaults False.
|
||||
dc.write_drain_request(principal="dashboard")
|
||||
|
||||
await runner._notify_active_sessions_of_shutdown()
|
||||
|
||||
sent_chat_ids = {chat_id for chat_id, _content, _meta in adapter.sent_calls}
|
||||
# Both targets notified (today's behaviour preserved).
|
||||
assert "999" in sent_chat_ids
|
||||
assert "home-42" in sent_chat_ids
|
||||
|
|
|
|||
|
|
@ -270,6 +270,31 @@ class TestWebServerEndpoints:
|
|||
assert drain_control.drain_requested() is True
|
||||
drain_control.clear_drain_request()
|
||||
|
||||
def test_gateway_drain_suppress_notification_passthrough(self):
|
||||
from gateway import drain_control
|
||||
|
||||
resp = self.client.post(
|
||||
"/api/gateway/drain",
|
||||
json={"action": "drain", "suppress_notification": True},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["suppress_notification"] is True
|
||||
# The flag landed on the marker the gateway reads at shutdown.
|
||||
body = drain_control.read_drain_request()
|
||||
assert body is not None and body["suppress_notification"] is True
|
||||
assert drain_control.drain_notification_suppressed() is True
|
||||
drain_control.clear_drain_request()
|
||||
|
||||
def test_gateway_drain_suppress_defaults_false(self):
|
||||
from gateway import drain_control
|
||||
|
||||
resp = self.client.post("/api/gateway/drain", json={"action": "drain"})
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["suppress_notification"] is False
|
||||
assert drain_control.drain_notification_suppressed() is False
|
||||
drain_control.clear_drain_request()
|
||||
|
||||
def test_gateway_drain_cancel_removes_marker(self):
|
||||
from gateway import drain_control
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue