mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-30 11:52:04 +00:00
fix(gateway): stamp drain marker with instantiation epoch so a durable-volume restart clears it (NS-570)
The external-drain marker .drain_request.json is written under HERMES_HOME, which on Hermes Cloud is a persistent Fly volume (/opt/data). A begin-drain marker therefore SURVIVES the post-update machine restart. But the disruptive lifecycle actions a drain protects (auto-update / image migrate / env edit / profile change) all restart the machine — which is exactly the signal the drain is over. The freshly-restarted gateway re-read the orphaned marker on its startup reconcile and parked itself back in 'draining', refusing every new turn indefinitely (NS-570: ~52 min until manually cleared). Fix: stamp the marker with an identity of THIS container/VM instantiation (kernel boot_id + PID 1 start time, read from /proc) and treat a marker whose epoch differs from the current instantiation as absent. A deliberate restart → new PID 1 → new epoch → stale marker ignored → gateway boots 'running'. A marker written during the current instantiation (the live drain) still matches; an s6 respawn of just the gateway (PID 1/init unchanged) keeps the same epoch, so an in-flight drain is still honoured (D4a reversibility preserved). The staleness check is lenient and never fail-closed: a legacy marker with no epoch, a corrupt/contentless marker, or an environment with no /proc (epoch unavailable) all degrade to the original presence-only behaviour. NAS is untouched — it only ever POSTs begin/cancel-drain over HTTP; the marker file is purely gateway-internal IPC. The fix is entirely within gateway/drain_control.py; the watcher and the dashboard endpoint go through the same drain_requested()/write_drain_request() chokepoints and need no functional change.
This commit is contained in:
parent
e3db1ef92d
commit
8ab7246c45
3 changed files with 229 additions and 11 deletions
|
|
@ -16,18 +16,40 @@ share one definition and can never disagree.
|
|||
Contract (presence-based, mirroring ``.restart_notify.json``):
|
||||
|
||||
* begin-drain → write ``{HERMES_HOME}/.drain_request.json`` with
|
||||
``{"action": "drain", "requested_at": <iso>, "principal": <str>}``.
|
||||
``{"action": "drain", "requested_at": <iso>, "principal": <str>,
|
||||
"epoch": <instantiation-epoch>}``.
|
||||
* cancel-drain → remove the marker.
|
||||
* The gateway watcher treats **presence** of the marker as "external drain
|
||||
active": flip ``gateway_state -> "draining"`` and stop accepting new turns.
|
||||
Absence means "not draining" (revert to ``running`` if we had flipped it).
|
||||
* The gateway watcher treats **presence of a marker stamped with the current
|
||||
instantiation epoch** as "external drain active": flip
|
||||
``gateway_state -> "draining"`` and stop accepting new turns. Absence (or a
|
||||
marker from a *prior* instantiation) means "not draining" (revert to
|
||||
``running`` if we had flipped it).
|
||||
|
||||
Why the epoch (NS-570). ``HERMES_HOME`` is a **durable** store — on Hermes
|
||||
Cloud it is a persistent Fly volume (``/opt/data``). A begin-drain marker
|
||||
written there *survives a machine restart*. But the disruptive lifecycle
|
||||
actions a drain protects (auto-update / image migrate / env edit / profile
|
||||
change) all **restart the machine**, which is exactly the signal that the drain
|
||||
is over. Without the epoch, a freshly-restarted gateway re-reads the orphaned
|
||||
marker on boot and parks itself right back in ``draining`` forever (NS-570: an
|
||||
auto-updated instance refused every turn for ~52 min). Stamping the marker with
|
||||
an identity of *this* container/VM instantiation, and ignoring a marker whose
|
||||
epoch doesn't match, makes "a deliberate restart clears the drain" true by
|
||||
construction — while a marker written during the *current* instantiation (the
|
||||
live drain) still matches, and an s6 respawn of just the gateway (PID 1 / init
|
||||
unchanged) still honours an in-flight drain.
|
||||
|
||||
Reading the marker never raises: a malformed/half-written file reads as
|
||||
"present but contentless", which the watcher still treats as drain-active
|
||||
(fail-safe toward quiescing — a corrupt begin marker must not be ignored).
|
||||
(fail-safe toward quiescing — a corrupt begin marker must not be ignored). The
|
||||
epoch check is deliberately **lenient**: it ignores a marker only on a
|
||||
*definite* epoch mismatch. A marker with no epoch (legacy/corrupt/contentless),
|
||||
or an environment where the epoch cannot be computed (non-Linux, no ``/proc``),
|
||||
both degrade to the original presence-only behaviour — never fail-closed.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import functools
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
|
|
@ -42,6 +64,68 @@ _log = logging.getLogger(__name__)
|
|||
_DRAIN_REQUEST_FILENAME = ".drain_request.json"
|
||||
|
||||
|
||||
@functools.lru_cache(maxsize=1)
|
||||
def current_instantiation_epoch() -> str:
|
||||
"""Identity of THIS container / VM instantiation.
|
||||
|
||||
Stable for the life of the PID-1 init process — so an s6 respawn of just
|
||||
the gateway keeps the same epoch and an in-flight drain is honoured — but
|
||||
changes when the machine/container is recreated (a fresh PID 1 → a fresh
|
||||
epoch). Composed from two ``/proc`` facts:
|
||||
|
||||
* the kernel **boot id** (``/proc/sys/kernel/random/boot_id``) — changes
|
||||
on a VM / microVM reboot (e.g. a Fly Firecracker machine restart);
|
||||
* **PID 1's start time** (field 22 of ``/proc/1/stat``) — changes on a
|
||||
plain ``docker restart`` (the host kernel, hence boot_id, is unchanged,
|
||||
but ``/init`` is a brand-new process).
|
||||
|
||||
Together they discriminate every restart mode that matters:
|
||||
|
||||
| event | boot_id | pid1 start | epoch | marker |
|
||||
|--------------------------------|---------|------------|--------|--------|
|
||||
| Fly microVM reboot (auto-upd.) | changes | changes | NEW | reject |
|
||||
| plain ``docker restart`` | same | changes | NEW | reject |
|
||||
| s6 respawn of the gateway only | same | same | SAME | honour |
|
||||
| host ``hermes gateway restart``| same | same(init) | SAME | honour |
|
||||
|
||||
The last row is intentional: a host install has no durable-volume drain
|
||||
bug, and honouring a drain across a deliberate process restart is the
|
||||
intended reversible behaviour (D4a) — PID 1 there is the long-lived init
|
||||
(systemd/launchd), so the epoch is stable.
|
||||
|
||||
Returns ``""`` when neither identity source is readable (non-Linux, no
|
||||
``/proc``). An empty epoch disables the staleness check downstream,
|
||||
degrading to the released presence-only behaviour — never fail-closed.
|
||||
Memoised: the epoch is constant for the life of the process.
|
||||
"""
|
||||
boot_id = ""
|
||||
try:
|
||||
boot_id = (
|
||||
Path("/proc/sys/kernel/random/boot_id")
|
||||
.read_text(encoding="utf-8")
|
||||
.strip()
|
||||
)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
pid1_start = ""
|
||||
try:
|
||||
# /proc/1/stat: "<pid> (<comm>) <state> ... <starttime@field22> ...".
|
||||
# comm can contain spaces and parens, so split on the LAST ')' and
|
||||
# index into the whitespace-delimited tail. starttime is field 22
|
||||
# (1-indexed); after the comm the tail starts at field 3, so it is the
|
||||
# tail's index 19.
|
||||
stat = Path("/proc/1/stat").read_text(encoding="utf-8")
|
||||
tail = stat.rsplit(")", 1)[1].split()
|
||||
pid1_start = tail[19]
|
||||
except (OSError, IndexError):
|
||||
pass
|
||||
|
||||
if not boot_id and not pid1_start:
|
||||
return ""
|
||||
return f"{boot_id}:{pid1_start}"
|
||||
|
||||
|
||||
def drain_request_path(home: Optional[Path] = None) -> Path:
|
||||
"""Absolute path to the drain-request marker, respecting HERMES_HOME."""
|
||||
base = home if home is not None else get_hermes_home()
|
||||
|
|
@ -56,11 +140,16 @@ def write_drain_request(
|
|||
Atomic write so the gateway watcher never reads a half-written file.
|
||||
Idempotent: re-writing while a drain is already in progress just refreshes
|
||||
``requested_at`` (harmless — the watcher keys off presence, not content).
|
||||
|
||||
Stamps the marker with :func:`current_instantiation_epoch` so a marker that
|
||||
later survives a machine restart on the durable HERMES_HOME volume can be
|
||||
recognised as stale and ignored (NS-570).
|
||||
"""
|
||||
payload = {
|
||||
"action": "drain",
|
||||
"requested_at": datetime.now(timezone.utc).isoformat(),
|
||||
"principal": principal,
|
||||
"epoch": current_instantiation_epoch(),
|
||||
}
|
||||
atomic_json_write(drain_request_path(home), payload)
|
||||
return payload
|
||||
|
|
@ -82,9 +171,44 @@ def clear_drain_request(*, home: Optional[Path] = None) -> bool:
|
|||
return False
|
||||
|
||||
|
||||
def _marker_epoch_is_stale(body: dict[str, Any]) -> bool:
|
||||
"""True iff ``body``'s epoch is a *definite* mismatch with this process.
|
||||
|
||||
Lenient by design — returns False (i.e. "not stale, honour it") whenever it
|
||||
can't be sure:
|
||||
* the current epoch can't be computed ("" fallback, no /proc), OR
|
||||
* the marker carries no epoch (legacy marker, or a corrupt/contentless
|
||||
``{}`` body).
|
||||
Only a marker whose epoch is present AND differs from the current
|
||||
instantiation epoch is considered stale. This preserves the
|
||||
fail-safe-toward-quiescing contract for malformed markers.
|
||||
"""
|
||||
current = current_instantiation_epoch()
|
||||
if not current:
|
||||
return False
|
||||
marker_epoch = body.get("epoch")
|
||||
if not marker_epoch:
|
||||
return False
|
||||
return marker_epoch != current
|
||||
|
||||
|
||||
def drain_requested(*, home: Optional[Path] = None) -> bool:
|
||||
"""True iff the begin-drain marker is present (external drain active)."""
|
||||
return drain_request_path(home).exists()
|
||||
"""True iff a begin-drain marker for THIS instantiation is present.
|
||||
|
||||
A marker whose ``epoch`` does not match the current instantiation epoch is
|
||||
treated as absent: it survived a container/VM restart (HERMES_HOME is a
|
||||
durable Fly volume on Hermes Cloud) and the lifecycle action that triggered
|
||||
the drain has already completed — honouring it would wedge the
|
||||
freshly-restarted gateway in ``draining`` (NS-570). The staleness check is
|
||||
lenient (see :func:`_marker_epoch_is_stale`): a legacy/corrupt marker with
|
||||
no epoch, or an environment without ``/proc``, still reads as drain-active.
|
||||
"""
|
||||
body = read_drain_request(home=home)
|
||||
if body is None:
|
||||
return False
|
||||
if _marker_epoch_is_stale(body):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def read_drain_request(*, home: Optional[Path] = None) -> Optional[dict[str, Any]]:
|
||||
|
|
|
|||
|
|
@ -4075,8 +4075,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
gateway/drain_control.py). Marker present -> ``_enter_external_drain``;
|
||||
marker absent -> ``_exit_external_drain``. The 1s cadence bounds the
|
||||
observe-the-marker latency the live-validation gate checks (point a).
|
||||
Reconciles once at startup so a marker that survived a restart is
|
||||
honoured immediately. Best-effort: any tick error is logged and the
|
||||
Reconciles once at startup. A marker stamped with a PRIOR
|
||||
instantiation epoch (one that survived a machine restart on the durable
|
||||
HERMES_HOME volume — NS-570) is treated as absent by ``drain_requested``
|
||||
and is NOT honoured; only a marker from the current instantiation flips
|
||||
the gateway into drain. Best-effort: any tick error is logged and the
|
||||
loop continues (a transient stat() failure must not wedge the gateway).
|
||||
"""
|
||||
from gateway.drain_control import drain_requested
|
||||
|
|
@ -6342,8 +6345,10 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
|
||||
# Start background drain-control watcher — reconciles the gateway's
|
||||
# new-turn accept-state with the external ``.drain_request.json`` marker
|
||||
# the dashboard begin/cancel-drain endpoint writes (Phase 2). Honours a
|
||||
# marker that survived a restart on its first tick.
|
||||
# the dashboard begin/cancel-drain endpoint writes (Phase 2). A marker
|
||||
# left behind by a prior instantiation (durable-volume restart, NS-570)
|
||||
# is ignored via its instantiation epoch; only a current-epoch marker
|
||||
# engages drain on the first tick.
|
||||
asyncio.create_task(self._drain_control_watcher())
|
||||
|
||||
logger.info("Press Ctrl+C to stop")
|
||||
|
|
|
|||
|
|
@ -72,6 +72,95 @@ class TestMarkerContract:
|
|||
assert data["action"] == "drain"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Instantiation-epoch staleness (NS-570: orphaned marker on durable volume)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestInstantiationEpoch:
|
||||
def test_write_stamps_current_epoch(self, home):
|
||||
payload = dc.write_drain_request(principal="nas")
|
||||
assert payload["epoch"] == dc.current_instantiation_epoch()
|
||||
body = dc.read_drain_request()
|
||||
assert body is not None and body["epoch"] == dc.current_instantiation_epoch()
|
||||
|
||||
def test_current_epoch_is_stable_within_process(self):
|
||||
# Memoised — an s6 respawn of just the gateway keeps PID 1, so a
|
||||
# repeated call inside one process must return the same value (an
|
||||
# in-flight drain stays honoured).
|
||||
assert dc.current_instantiation_epoch() == dc.current_instantiation_epoch()
|
||||
|
||||
def test_marker_from_prior_instantiation_reads_as_absent(self, home, monkeypatch):
|
||||
# THE NS-570 REGRESSION. A begin-drain marker written by a PREVIOUS
|
||||
# container/VM instantiation survives on the durable HERMES_HOME volume
|
||||
# across a machine restart. The freshly-restarted gateway (new epoch)
|
||||
# must treat it as absent, NOT re-engage drain.
|
||||
monkeypatch.setattr(dc, "current_instantiation_epoch", lambda: "epoch-OLD")
|
||||
dc.write_drain_request(principal="nas") # stamps "epoch-OLD"
|
||||
assert dc.drain_requested() is True # same epoch → active
|
||||
|
||||
# Simulate the restart: a brand-new instantiation epoch.
|
||||
monkeypatch.setattr(dc, "current_instantiation_epoch", lambda: "epoch-NEW")
|
||||
# The marker file is still physically present on the volume…
|
||||
assert dc.drain_request_path().exists() is True
|
||||
# …but it is ignored because its epoch belongs to a prior instantiation.
|
||||
assert dc.drain_requested() is False
|
||||
|
||||
def test_marker_from_current_instantiation_is_honoured(self, home, monkeypatch):
|
||||
monkeypatch.setattr(dc, "current_instantiation_epoch", lambda: "epoch-A")
|
||||
dc.write_drain_request()
|
||||
assert dc.drain_requested() is True
|
||||
|
||||
def test_legacy_marker_without_epoch_still_active(self, home):
|
||||
# A marker written before this change (no "epoch" key) must remain
|
||||
# fail-safe toward quiescing — never silently ignored.
|
||||
import json
|
||||
|
||||
dc.drain_request_path().write_text(
|
||||
json.dumps({"action": "drain", "requested_at": "x", "principal": "p"}),
|
||||
encoding="utf-8",
|
||||
)
|
||||
assert dc.drain_requested() is True
|
||||
|
||||
def test_corrupt_marker_with_no_parseable_epoch_still_active(self, home):
|
||||
# Half-written / malformed → read_drain_request returns {} → no epoch →
|
||||
# lenient check keeps it active (fail-safe), same as before the change.
|
||||
dc.drain_request_path().write_text("{not valid json", encoding="utf-8")
|
||||
assert dc.drain_requested() is True
|
||||
|
||||
def test_unavailable_epoch_disables_staleness_check(self, home, monkeypatch):
|
||||
# No /proc (non-Linux, etc.) → epoch "" → degrade to presence-only:
|
||||
# any present marker (even with a foreign epoch) reads as active rather
|
||||
# than fail-closed.
|
||||
import json
|
||||
|
||||
dc.drain_request_path().write_text(
|
||||
json.dumps({"action": "drain", "epoch": "some-other-epoch"}),
|
||||
encoding="utf-8",
|
||||
)
|
||||
monkeypatch.setattr(dc, "current_instantiation_epoch", lambda: "")
|
||||
assert dc.drain_requested() is True
|
||||
|
||||
def test_current_epoch_empty_when_proc_unreadable(self, monkeypatch):
|
||||
# When neither /proc identity source is readable, the epoch is "" so
|
||||
# the staleness check is disabled rather than crashing.
|
||||
from pathlib import Path as _P
|
||||
|
||||
orig_read_text = _P.read_text
|
||||
|
||||
def _boom(self, *a, **k):
|
||||
if str(self).startswith("/proc/"):
|
||||
raise OSError("no /proc")
|
||||
return orig_read_text(self, *a, **k)
|
||||
|
||||
dc.current_instantiation_epoch.cache_clear()
|
||||
monkeypatch.setattr(_P, "read_text", _boom)
|
||||
try:
|
||||
assert dc.current_instantiation_epoch() == ""
|
||||
finally:
|
||||
dc.current_instantiation_epoch.cache_clear()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Gateway state machine (enter / exit / idempotency)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue