feat(gateway): external drain trigger + accept-gating (begin/cancel + control channel)

Tasks 2.1 + 2.2 + 2.3 of the safe-shutdown plan — the reversible
quiesce-without-restart machinery NAS drives during a lifecycle action (D4a).
These ship together because the endpoint, the control channel, and the gateway
state machine are one coherent slice.

2.2 — control channel (gateway/drain_control.py, new):
The dashboard has no HTTP path into a running gateway (guardrails: "there is NO
external control channel into a running gateway"); restart/drain is driven only
by markers the gateway reacts to. So begin/cancel-drain writes/removes a
presence-based marker .drain_request.json (HERMES_HOME-scoped, atomic write,
never-raises read; a corrupt marker reads as present-contentless → fail-safe
toward quiescing). This is Q-B option A.

2.2 — gateway state machine (gateway/run.py):
- _external_drain_active flag, DISTINCT from the shutdown _draining flag: this
  one does NOT exit the process and is fully reversible.
- _enter_external_drain / _exit_external_drain: idempotent transitions that
  flip gateway_state→draining / →running via _update_runtime_status (preserving
  the live active_agents count). exit refuses to revert to running during a
  real shutdown or after the loop stops (shutdown wins).
- _drain_control_watcher: 1s background task (modelled on _handoff_watcher)
  reconciling accept-state with the marker; honours a marker that survived a
  restart on its first tick. Registered alongside the other watchers in start.
- New-turn accept gate in _handle_message, placed BEFORE the session-slot
  claim: when draining, refuse to START a new turn (so active_agents can only
  fall → no TOCTOU race), while in-flight turns finish untouched. Internal/
  system events (restart-recovery replays, bg-process completions) bypass it.

2.1 — endpoint (hermes_cli/web_server.py):
POST /api/gateway/drain {action: drain|cancel}. Authenticated by the Task-2.0a
token seam (the drain plugin registered this exact path as a token route);
attributes the request to the verified token principal. Begin writes the
marker, cancel removes it — the gateway process owns the actual transition.
Force-override (D6) is NOT here; it maps onto the existing immediate
/api/gateway/restart force path.

Tests (mocked — necessary-not-sufficient; the HARD live gate Q-B is next):
- tests/gateway/test_external_drain_control.py — marker contract (write/clear/
  read/corrupt/atomic), state machine (enter/exit/idempotency/shutdown-wins/
  loop-stopped), watcher reconcile-enter-then-exit, new-turn refusal, and
  in-flight-not-interrupted. 15 tests.
- tests/hermes_cli/test_web_server.py — /api/gateway/drain begin/default-begin/
  cancel/cancel-idempotent/bad-action-400. 6 tests.
- dashboard.drain_auth config section already added in 2.0b commit.

All touched suites green: 301 (gateway+auth) + 9 (web_server endpoints) passed.

Intentionally deferred:
- HARD live-validation gate (Q-B): real isolated `hermes gateway run`, drive a
  real begin-drain marker, prove the 5-point checklist a–e.
- Spec-doc status flip + Phase-2 PR.

Build status: external-drain, restart-drain, status, dashboard-auth, drain-plugin,
token-auth, and web_server-endpoint suites green.
This commit is contained in:
Ben 2026-06-22 11:27:44 +10:00 committed by Teknium
parent 2e322466b1
commit 19b2624404
5 changed files with 531 additions and 0 deletions

109
gateway/drain_control.py Normal file
View file

@ -0,0 +1,109 @@
"""External drain-control marker contract (dashboard → gateway).
Task 2.2 of the safe-shutdown plan (decisions.md Q-B, option A): the dashboard
has no way to call into a running gateway there is no HTTP control channel
into the gateway process (guardrails: "there is NO external control channel
into a running gateway"). Restart/drain is driven only by the gateway reacting
to its own inputs: slash commands, process signals, and file markers it writes
itself (``.restart_notify.json``).
So the begin/cancel-drain dashboard endpoint communicates with the running
gateway the same way: it writes (or removes) a marker file, and a gateway
background watcher reacts to it. This module owns that marker contract so both
sides the dashboard endpoint (writer) and the gateway watcher (reader)
share one definition and can never disagree.
Contract (presence-based, mirroring ``.restart_notify.json``):
* begin-drain write ``{HERMES_HOME}/.drain_request.json`` with
``{"action": "drain", "requested_at": <iso>, "principal": <str>}``.
* cancel-drain remove the marker.
* The gateway watcher treats **presence** of the marker as "external drain
active": flip ``gateway_state -> "draining"`` and stop accepting new turns.
Absence means "not draining" (revert to ``running`` if we had flipped it).
Reading the marker never raises: a malformed/half-written file reads as
"present but contentless", which the watcher still treats as drain-active
(fail-safe toward quiescing a corrupt begin marker must not be ignored).
"""
from __future__ import annotations
import json
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
from hermes_constants import get_hermes_home
from utils import atomic_json_write
_log = logging.getLogger(__name__)
_DRAIN_REQUEST_FILENAME = ".drain_request.json"
def drain_request_path(home: Optional[Path] = None) -> Path:
"""Absolute path to the drain-request marker, respecting HERMES_HOME."""
base = home if home is not None else get_hermes_home()
return Path(base) / _DRAIN_REQUEST_FILENAME
def write_drain_request(
*, principal: str = "drain-control", home: Optional[Path] = None
) -> dict[str, Any]:
"""Write the begin-drain marker. Returns the payload written.
Atomic write so the gateway watcher never reads a half-written file.
Idempotent: re-writing while a drain is already in progress just refreshes
``requested_at`` (harmless the watcher keys off presence, not content).
"""
payload = {
"action": "drain",
"requested_at": datetime.now(timezone.utc).isoformat(),
"principal": principal,
}
atomic_json_write(drain_request_path(home), payload)
return payload
def clear_drain_request(*, home: Optional[Path] = None) -> bool:
"""Remove the drain marker (cancel-drain). Returns True if one existed.
Best-effort: a missing file is not an error (cancel is idempotent).
"""
path = drain_request_path(home)
try:
path.unlink()
return True
except FileNotFoundError:
return False
except OSError as e:
_log.warning("drain-control: failed to remove %s: %s", path, e)
return False
def drain_requested(*, home: Optional[Path] = None) -> bool:
"""True iff the begin-drain marker is present (external drain active)."""
return drain_request_path(home).exists()
def read_drain_request(*, home: Optional[Path] = None) -> Optional[dict[str, Any]]:
"""Return the marker payload, or ``None`` if absent.
A present-but-unparseable marker returns ``{}`` (truthy-presence preserved
via :func:`drain_requested`; callers that need the body get an empty dict
rather than an exception). Never raises.
"""
path = drain_request_path(home)
try:
raw = path.read_text(encoding="utf-8")
except FileNotFoundError:
return None
except OSError as e:
_log.warning("drain-control: failed to read %s: %s", path, e)
return None
try:
data = json.loads(raw)
except (ValueError, TypeError):
return {}
return data if isinstance(data, dict) else {}

View file

@ -2533,6 +2533,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
_restart_drain_timeout: float = DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT
_exit_code: Optional[int] = None
_draining: bool = False
_external_drain_active: bool = False
_restart_requested: bool = False
_restart_task_started: bool = False
_restart_detached: bool = False
@ -2593,6 +2594,16 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
self._exit_reason: Optional[str] = None
self._exit_code: Optional[int] = None
self._draining = False
# External (NAS-driven) drain state — distinct from the shutdown
# ``_draining`` flag above. Set by ``_drain_control_watcher`` when the
# ``.drain_request.json`` marker is present: the gateway flips
# ``gateway_state -> draining`` and refuses NEW turns, but the process
# does NOT exit (the whole point — quiesce-without-restart, D4a). It is
# fully reversible: removing the marker reverts to ``running`` and
# re-accepts turns. ``_draining`` (shutdown) is one-way and ends in
# process exit; this one is a steady state NAS polls during its
# request -> poll -> proceed loop.
self._external_drain_active = False
self._restart_requested = False
# Set by shutdown_signal_handler when a SIGTERM/SIGINT arrived
# WITHOUT a planned-stop / takeover marker — i.e. an unexpected
@ -4003,6 +4014,85 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
except Exception:
pass
# ------------------------------------------------------------------
# External drain control (NAS-driven quiesce-without-restart, Phase 2).
# The dashboard's begin/cancel-drain endpoint writes/removes the
# ``.drain_request.json`` marker (gateway/drain_control.py); this watcher
# observes the marker and flips the gateway between accepting and refusing
# NEW turns, WITHOUT exiting the process. Reversible by design (D4a): NAS
# POSTs begin-drain, polls /api/status until active_agents hits 0, proceeds
# with its lifecycle action, then (on cancel/abort) the marker is removed
# and the gateway re-accepts turns.
# ------------------------------------------------------------------
def _enter_external_drain(self) -> None:
"""Begin external drain: stop accepting new turns, flip state.
Idempotent re-entering while already draining is a no-op beyond a
best-effort status re-write. In-flight turns are NOT interrupted (the
whole point is to let them finish); only NEW turns are refused.
"""
if self._external_drain_active:
return
self._external_drain_active = True
logger.info(
"External drain ENGAGED (.drain_request.json present) — refusing "
"new turns; %d in-flight turn(s) will finish. Process stays up.",
self._running_agent_count(),
)
# Flip the persisted lifecycle state so /api/status.gateway_busy /
# gateway_drainable track the drain. Preserve active_agents (the
# read-merge keeps the live count); only the state changes.
self._update_runtime_status("draining")
def _exit_external_drain(self) -> None:
"""Cancel external drain: revert state, re-accept new turns.
Idempotent. Only reverts to ``running`` when we are actually mid-drain
AND not also shutting down (a real shutdown ``_draining`` must win
never resurrect a stopping gateway to ``running``).
"""
if not self._external_drain_active:
return
self._external_drain_active = False
if self._draining or not self._running:
# A shutdown drain is in progress / the loop has stopped — do not
# clobber the terminal state back to running.
logger.info(
"External drain marker cleared during shutdown — not reverting "
"to running (shutdown takes precedence)."
)
return
logger.info(
"External drain RELEASED (.drain_request.json removed) — "
"re-accepting new turns; gateway_state -> running."
)
self._update_runtime_status("running")
async def _drain_control_watcher(self, interval: float = 1.0) -> None:
"""Background task: reconcile gateway accept-state with the drain marker.
Polls ``.drain_request.json`` (presence-based contract,
gateway/drain_control.py). Marker present -> ``_enter_external_drain``;
marker absent -> ``_exit_external_drain``. The 1s cadence bounds the
observe-the-marker latency the live-validation gate checks (point a).
Reconciles once at startup so a marker that survived a restart is
honoured immediately. Best-effort: any tick error is logged and the
loop continues (a transient stat() failure must not wedge the gateway).
"""
from gateway.drain_control import drain_requested
while self._running:
try:
if drain_requested():
self._enter_external_drain()
else:
self._exit_external_drain()
except asyncio.CancelledError:
raise
except Exception as exc:
logger.debug("Drain-control watcher tick error: %s", exc, exc_info=True)
await asyncio.sleep(interval)
def _update_platform_runtime_status(
self,
platform: str,
@ -6250,6 +6340,12 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
except Exception: # noqa: BLE001 - arming must never block startup
logger.debug("scale-to-zero: arm check failed at startup", exc_info=True)
# Start background drain-control watcher — reconciles the gateway's
# new-turn accept-state with the external ``.drain_request.json`` marker
# the dashboard begin/cancel-drain endpoint writes (Phase 2). Honours a
# marker that survived a restart on its first tick.
asyncio.create_task(self._drain_control_watcher())
logger.info("Press Ctrl+C to stop")
return True
@ -8860,6 +8956,27 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
return self._telegram_topic_root_lobby_message()
return None
# ── External-drain new-turn gate (Phase 2) ────────────────────
# When NAS has engaged an external drain (.drain_request.json present,
# observed by _drain_control_watcher), refuse to START a new turn so
# the in-flight set can only fall to zero — eliminating the TOCTOU race
# (D4a: stop accepting new turns FIRST, then NAS polls until
# active_agents==0). In-flight turns are untouched; this only blocks the
# claim of a NEW session slot. Internal/system events (restart-recovery
# replays, background-process completions) bypass the gate — they are
# not user-initiated new work and must still flow during a drain.
# Reversible: once the marker is removed the gate opens again.
if self._external_drain_active and not is_internal:
logger.info(
"Refusing new turn for session %s — external drain active.",
_quick_key,
)
return (
"⏳ This agent is draining for a maintenance action and isn't "
"accepting new turns right now. It'll be back in a moment — "
"please resend shortly."
)
# ── Claim this session before any await ───────────────────────
# Between here and _run_agent registering the real AIAgent, there
# are numerous await points (hooks, vision enrichment, STT,

View file

@ -2639,6 +2639,71 @@ async def restart_gateway(profile: Optional[str] = None):
}
@app.post("/api/gateway/drain")
async def gateway_drain(request: Request):
"""Begin or cancel an external (NAS-driven) gateway drain.
Authenticated by the non-interactive token-auth seam: the
``dashboard_auth/drain`` plugin registers this exact path as a token route
and verifies the ``Authorization`` bearer secret. If that plugin isn't
active (no ``HERMES_DASHBOARD_DRAIN_SECRET``), the route is NOT a token
route, so on a gated bind the cookie gate handles it (a browser session can
still drive it from the dashboard) and on a loopback bind the legacy
session-token gate applies either way it is never unauthenticated on a
network-exposed bind.
Body: ``{"action": "drain"}`` (begin) or ``{"action": "cancel"}`` (cancel).
Begin writes the ``.drain_request.json`` marker the gateway's
``_drain_control_watcher`` observes (flip to ``draining`` + refuse new
turns); cancel removes it (revert to ``running`` + re-accept). Idempotent
on both sides. This endpoint only writes/removes the marker the gateway
process owns the actual state transition (there is no HTTP control channel
into the running gateway; the marker IS the channel, decisions.md Q-B).
The force-override (D6: "unless a user commands it") is NOT here an
immediate, drain-skipping action maps onto the existing
``POST /api/gateway/restart`` force path, which supersedes a drain.
"""
from gateway.drain_control import (
clear_drain_request,
drain_requested,
write_drain_request,
)
try:
body = await request.json()
except Exception:
body = {}
action = str((body or {}).get("action", "drain")).strip().lower()
# Attribute the request to the verified token principal when present
# (token-auth seam attaches it); fall back to a generic label otherwise.
principal_obj = getattr(request.state, "token_principal", None)
principal = getattr(principal_obj, "principal", None) or "dashboard"
if action == "cancel":
existed = clear_drain_request()
_log.info("Gateway drain CANCEL requested by %s (existed=%s)", principal, existed)
return {"ok": True, "action": "cancel", "was_draining": existed}
if action != "drain":
raise HTTPException(
status_code=400,
detail=f"Unknown drain action {action!r}; expected 'drain' or 'cancel'",
)
payload = write_drain_request(principal=str(principal))
_log.info("Gateway drain BEGIN requested by %s", principal)
return {
"ok": True,
"action": "drain",
"requested_at": payload["requested_at"],
# Echo so a caller polling /api/status knows the marker is now set;
# the gateway watcher flips gateway_state -> draining within ~1s.
"draining": drain_requested(),
}
@app.post("/api/hermes/update")
async def update_hermes():
"""Kick off ``hermes update`` in the background."""

View file

@ -0,0 +1,196 @@
"""Tests for the external drain-control marker contract + gateway state machine.
Task 2.2/2.3. Two layers:
* drain_control.py the presence-based marker contract (write/clear/read,
HERMES_HOME-scoped, never-raises).
* GatewayRunner enter/exit/watcher + the new-turn accept gate the
reversible state machine driven by the marker.
Mocked tests are necessary-not-sufficient here (the HARD live-validation gate,
Q-B, exercises a real `hermes gateway run`); these lock the unit contract.
"""
from __future__ import annotations
import asyncio
from pathlib import Path
from unittest.mock import MagicMock
import pytest
import gateway.drain_control as dc
from gateway.run import GatewayRunner
from gateway.platforms.base import MessageEvent, MessageType
from tests.gateway.restart_test_helpers import make_restart_runner, make_restart_source
# ---------------------------------------------------------------------------
# Marker contract (drain_control.py)
# ---------------------------------------------------------------------------
@pytest.fixture
def home(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
return tmp_path
class TestMarkerContract:
def test_absent_by_default(self, home):
assert dc.drain_requested() is False
assert dc.read_drain_request() is None
def test_write_then_present(self, home):
payload = dc.write_drain_request(principal="nas")
assert dc.drain_requested() is True
assert payload["action"] == "drain"
assert payload["principal"] == "nas"
body = dc.read_drain_request()
assert body is not None and body["principal"] == "nas"
def test_clear_removes(self, home):
dc.write_drain_request()
assert dc.clear_drain_request() is True
assert dc.drain_requested() is False
# idempotent: clearing again is a no-op, returns False
assert dc.clear_drain_request() is False
def test_path_respects_hermes_home(self, home):
assert dc.drain_request_path() == home / ".drain_request.json"
def test_corrupt_marker_reads_as_present_contentless(self, home):
# A half-written / malformed marker must still count as "drain active"
# (fail-safe toward quiescing).
dc.drain_request_path().write_text("{not valid json", encoding="utf-8")
assert dc.drain_requested() is True
assert dc.read_drain_request() == {}
def test_write_is_atomic_json(self, home):
dc.write_drain_request(principal="x")
import json
data = json.loads(dc.drain_request_path().read_text())
assert data["action"] == "drain"
# ---------------------------------------------------------------------------
# Gateway state machine (enter / exit / idempotency)
# ---------------------------------------------------------------------------
def _drain_runner():
runner, adapter = make_restart_runner()
runner._external_drain_active = False
# Bind the real methods under test.
runner._enter_external_drain = GatewayRunner._enter_external_drain.__get__(
runner, GatewayRunner
)
runner._exit_external_drain = GatewayRunner._exit_external_drain.__get__(
runner, GatewayRunner
)
return runner, adapter
class TestDrainStateMachine:
def test_enter_sets_flag_and_flips_state(self):
runner, _ = _drain_runner()
runner._enter_external_drain()
assert runner._external_drain_active is True
runner._update_runtime_status.assert_called_with("draining")
def test_enter_idempotent(self):
runner, _ = _drain_runner()
runner._enter_external_drain()
runner._update_runtime_status.reset_mock()
runner._enter_external_drain() # second call — no-op
runner._update_runtime_status.assert_not_called()
def test_exit_reverts_to_running(self):
runner, _ = _drain_runner()
runner._enter_external_drain()
runner._update_runtime_status.reset_mock()
runner._exit_external_drain()
assert runner._external_drain_active is False
runner._update_runtime_status.assert_called_with("running")
def test_exit_idempotent_when_not_draining(self):
runner, _ = _drain_runner()
runner._exit_external_drain() # never entered — no-op
runner._update_runtime_status.assert_not_called()
def test_exit_during_shutdown_does_not_revert_to_running(self):
runner, _ = _drain_runner()
runner._enter_external_drain()
runner._update_runtime_status.reset_mock()
# A shutdown drain is now in progress — exit must NOT resurrect running.
runner._draining = True
runner._exit_external_drain()
assert runner._external_drain_active is False
runner._update_runtime_status.assert_not_called()
def test_exit_when_loop_stopped_does_not_revert(self):
runner, _ = _drain_runner()
runner._enter_external_drain()
runner._update_runtime_status.reset_mock()
runner._running = False
runner._exit_external_drain()
runner._update_runtime_status.assert_not_called()
# ---------------------------------------------------------------------------
# Watcher reconciliation
# ---------------------------------------------------------------------------
class TestDrainWatcher:
@pytest.mark.asyncio
async def test_watcher_enters_then_exits_with_marker(self, home):
runner, _ = _drain_runner()
runner._drain_control_watcher = GatewayRunner._drain_control_watcher.__get__(
runner, GatewayRunner
)
# Drive a few ticks manually rather than spinning the loop.
dc.write_drain_request()
task = asyncio.create_task(runner._drain_control_watcher(interval=0.02))
await asyncio.sleep(0.06)
assert runner._external_drain_active is True
dc.clear_drain_request()
await asyncio.sleep(0.06)
assert runner._external_drain_active is False
runner._running = False
await asyncio.sleep(0.04)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# ---------------------------------------------------------------------------
# New-turn accept gate
# ---------------------------------------------------------------------------
class TestNewTurnGate:
@pytest.mark.asyncio
async def test_new_turn_refused_during_external_drain(self):
runner, _ = _drain_runner()
runner._external_drain_active = True
event = MessageEvent(
text="hello",
message_type=MessageType.TEXT,
source=make_restart_source(),
message_id="m1",
)
result = await runner._handle_message(event)
assert result is not None
assert "draining" in result.lower()
@pytest.mark.asyncio
async def test_in_flight_turn_not_interrupted_by_drain(self):
# Entering drain must NOT touch the running-agents set.
runner, _ = _drain_runner()
sentinel = MagicMock()
runner._running_agents["k"] = sentinel
runner._enter_external_drain()
assert runner._running_agents.get("k") is sentinel
sentinel.interrupt.assert_not_called()

View file

@ -249,6 +249,50 @@ class TestWebServerEndpoints:
assert "active_sessions" in data
assert data["can_update_hermes"] is True
def test_gateway_drain_begin_writes_marker(self):
from gateway import drain_control
resp = self.client.post("/api/gateway/drain", json={"action": "drain"})
assert resp.status_code == 200
data = resp.json()
assert data["ok"] is True and data["action"] == "drain"
assert data["draining"] is True
assert drain_control.drain_requested() is True
# cleanup
drain_control.clear_drain_request()
def test_gateway_drain_defaults_to_begin(self):
from gateway import drain_control
resp = self.client.post("/api/gateway/drain", json={})
assert resp.status_code == 200
assert resp.json()["action"] == "drain"
assert drain_control.drain_requested() is True
drain_control.clear_drain_request()
def test_gateway_drain_cancel_removes_marker(self):
from gateway import drain_control
drain_control.write_drain_request()
resp = self.client.post("/api/gateway/drain", json={"action": "cancel"})
assert resp.status_code == 200
data = resp.json()
assert data["ok"] is True and data["action"] == "cancel"
assert data["was_draining"] is True
assert drain_control.drain_requested() is False
def test_gateway_drain_cancel_idempotent(self):
from gateway import drain_control
resp = self.client.post("/api/gateway/drain", json={"action": "cancel"})
assert resp.status_code == 200
assert resp.json()["was_draining"] is False
assert drain_control.drain_requested() is False
def test_gateway_drain_bad_action_400(self):
resp = self.client.post("/api/gateway/drain", json={"action": "explode"})
assert resp.status_code == 400
def test_get_status_hides_update_capability_in_managed_runtime(self, monkeypatch):
import hermes_cli.web_server as web_server