From 51a338a1b6ca267f7efc474621d0691488f7e620 Mon Sep 17 00:00:00 2001 From: Ben Date: Sun, 21 Jun 2026 20:17:28 +1000 Subject: [PATCH 1/4] feat(gateway): track active_agents in runtime status on turn boundaries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The gateway only rewrote gateway_state.json on lifecycle transitions (start/connect/drain/stop), never on turn start/end. Live-verified on a hosted agent: a confirmed end-to-end turn ran while gateway_updated_at stayed frozen at boot and active_agents was absent — so any active_agents read from the file between transitions is stale. That makes it unusable as a busy/idle signal for an external consumer (NAS deciding whether it's safe to restart/migrate/auto-update an agent mid-turn). Add _persist_active_agents(), called at every turn boundary: - turn start: both running-agent sentinel-claim sites (normal inbound message path + startup-resume path) - turn end: the central _release_running_agent_state() choke point (covers normal completion, /stop, /reset, sentinel cleanup, stale-eviction — every path that ends a running turn) It passes ONLY active_agents to write_runtime_status, leaving gateway_state (and every other field) _UNSET so the read-merge-write preserves the current lifecycle state. Passing gateway_state=None would clobber it — hence a dedicated helper rather than reusing _update_runtime_status. The write is the same cheap JSON write done on lifecycle transitions today; best-effort (a failed status write never disrupts a turn). Behaviour-contract test: an active_agents-only write preserves both running and draining gateway_state, and the count clamps non-negative. --- gateway/run.py | 29 +++++++++++++++++++++++++ tests/gateway/test_status.py | 42 ++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/gateway/run.py b/gateway/run.py index bd991efeb69..e5df08d82d3 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -3665,6 +3665,28 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew except Exception: pass + def _persist_active_agents(self) -> None: + """Persist the live in-flight agent count to ``gateway_state.json``. + + Called at every turn boundary (a running-agent slot is claimed or + released) so the dashboard ``/api/status`` readout reflects in-flight + gateway turns in near-real-time. Without this the file is only + rewritten on lifecycle transitions, so any ``active_agents`` read + between transitions is stale (a turn could start and finish without the + file ever moving). + + Deliberately passes ONLY ``active_agents`` — ``gateway_state`` and the + other fields stay ``_UNSET`` so ``write_runtime_status``'s + read-merge-write preserves the current lifecycle state (``running`` / + ``draining`` / …). Passing ``gateway_state=None`` here would clobber it. + Best-effort: a failed status write must never disrupt a turn. + """ + try: + from gateway.status import write_runtime_status + write_runtime_status(active_agents=self._running_agent_count()) + except Exception: + pass + def _update_platform_runtime_status( self, platform: str, @@ -5187,6 +5209,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew # instead of spinning up a duplicate AIAgent (#45456). self._running_agents[entry.session_key] = _AGENT_PENDING_SENTINEL self._running_agents_ts[entry.session_key] = time.time() + self._persist_active_agents() # Empty-text internal event — the _is_resume_pending branch in # _handle_message_with_agent prepends the proper reason-aware @@ -8364,6 +8387,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew self._active_session_leases[_quick_key] = _active_session_lease self._running_agents[_quick_key] = _AGENT_PENDING_SENTINEL self._running_agents_ts[_quick_key] = time.time() + self._persist_active_agents() _run_generation = self._begin_session_run_generation(_quick_key) try: @@ -13476,6 +13500,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew self._running_agents_ts.pop(session_key, None) if hasattr(self, "_busy_ack_ts"): self._busy_ack_ts.pop(session_key, None) + # Turn boundary: a running-agent slot was just released. Persist the + # new (lower) in-flight count so the dashboard readout stays current + # between lifecycle transitions. Preserves gateway_state (see + # _persist_active_agents). + self._persist_active_agents() return True def _clear_session_boundary_security_state(self, session_key: str) -> None: diff --git a/tests/gateway/test_status.py b/tests/gateway/test_status.py index e8d2f57485c..6cfc1dbf752 100644 --- a/tests/gateway/test_status.py +++ b/tests/gateway/test_status.py @@ -1091,3 +1091,45 @@ class TestCorruptStatusFiles: p = tmp_path / "gateway.pid" p.write_text("4242", encoding="utf-8") assert status._read_pid_record(p) == {"pid": 4242} + + +class TestActiveAgentsTurnBoundaryWrite: + """The load-bearing Phase 1a contract: writing the in-flight count at a + turn boundary must PRESERVE the lifecycle gateway_state. The whole readout + depends on active_agents being refreshed per-turn while gateway_state is + only touched by lifecycle transitions — so an active_agents-only write must + not clobber it.""" + + def test_active_agents_only_write_preserves_gateway_state(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + + # Lifecycle transition sets running. + status.write_runtime_status(gateway_state="running", active_agents=0) + assert status.read_runtime_status()["gateway_state"] == "running" + + # Turn-boundary write: ONLY active_agents (gateway_state left _UNSET). + status.write_runtime_status(active_agents=2) + + rec = status.read_runtime_status() + assert rec["active_agents"] == 2 + # The state must survive the per-turn write — this is what makes the + # _persist_active_agents helper safe to call on every turn. + assert rec["gateway_state"] == "running" + + def test_active_agents_only_write_preserves_draining_state(self, tmp_path, monkeypatch): + """Same invariant while draining — a turn finishing mid-drain (count + falling) must not flip the state back to running.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + + status.write_runtime_status(gateway_state="draining", active_agents=3) + status.write_runtime_status(active_agents=2) + + rec = status.read_runtime_status() + assert rec["active_agents"] == 2 + assert rec["gateway_state"] == "draining" + + def test_active_agents_clamped_non_negative(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + status.write_runtime_status(gateway_state="running", active_agents=-5) + assert status.read_runtime_status()["active_agents"] == 0 + From 0ee75469d7c66e04983083740033f6d38feba113 Mon Sep 17 00:00:00 2001 From: Ben Date: Sun, 21 Jun 2026 20:17:53 +1000 Subject: [PATCH 2/4] feat(dashboard): surface gateway busy/drainable on /api/status MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Give an external consumer (NAS) a trustworthy, always-reachable busy/idle readout it can poll before a disruptive lifecycle action (restart, migrate, stop, auto-update). The dashboard /api/status is the only HTTP surface guaranteed up on a hosted agent regardless of which gateway platforms are enabled, and it already reads gateway_state.json. Add to /api/status (additive, non-breaking): - active_agents — in-flight gateway-turn count (now refreshed per-turn by the companion gateway-side commit) - gateway_busy — running AND active_agents > 0 - gateway_drainable — running and live (a valid begin-drain target) - restart_drain_timeout — resolved seconds, so the consumer can size its poll deadline without out-of-band knowledge (env HERMES_RESTART_DRAIN_TIMEOUT → config agent.restart_drain_timeout → default) The busy/drainable contract is defined once in gateway.status (derive_gateway_busy / derive_gateway_drainable) and consumed by both /api/status and /health/detailed so the two surfaces can never disagree. Liveness keys off gateway_running (a live PID/health probe), NEVER gateway_updated_at — a healthy idle gateway never advances that timestamp. All derived fields degrade to safe falsy values when the gateway is down or the status file is absent/corrupt (never a spurious "busy" that would wedge the consumer). active_sessions (the 5-min DB recency heuristic the SPA reads) is left exactly as-is — new signal, new fields. Tests (behaviour contracts, not snapshots): the pure derivation contract across every running/state/count/liveness combination; /api/status integration for busy, idle-drainable, draining, down, stale-busy-file, corrupt-count, and timeout surfacing; and /health/detailed parity. --- gateway/platforms/api_server.py | 24 ++++- gateway/status.py | 43 +++++++++ hermes_cli/web_server.py | 42 ++++++++ tests/gateway/test_api_server.py | 7 ++ tests/gateway/test_status.py | 46 +++++++++ tests/hermes_cli/test_web_server.py | 143 ++++++++++++++++++++++++++++ 6 files changed, 302 insertions(+), 3 deletions(-) diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index 09d0dc227a2..8d67aec85c4 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -1103,16 +1103,34 @@ class APIServerAdapter(BasePlatformAdapter): dashboard can display full status without needing a shared PID file or /proc access. No authentication required. """ - from gateway.status import read_runtime_status + from gateway.status import ( + derive_gateway_busy, + derive_gateway_drainable, + read_runtime_status, + ) runtime = read_runtime_status() or {} + gw_state = runtime.get("gateway_state") + gw_active = runtime.get("active_agents", 0) + # This endpoint is served BY the gateway process, so it is by definition + # alive — gateway_running is True. Derive busy/drainable from the same + # shared contract /api/status uses so the two surfaces never disagree. return web.json_response({ "status": "ok", "platform": "hermes-agent", "version": _hermes_version(), - "gateway_state": runtime.get("gateway_state"), + "gateway_state": gw_state, "platforms": runtime.get("platforms", {}), - "active_agents": runtime.get("active_agents", 0), + "active_agents": gw_active, + "gateway_busy": derive_gateway_busy( + gateway_running=True, + gateway_state=gw_state, + active_agents=gw_active, + ), + "gateway_drainable": derive_gateway_drainable( + gateway_running=True, + gateway_state=gw_state, + ), "exit_reason": runtime.get("exit_reason"), "updated_at": runtime.get("updated_at"), "pid": os.getpid(), diff --git a/gateway/status.py b/gateway/status.py index b4bee42fdad..d5f956a6cd6 100644 --- a/gateway/status.py +++ b/gateway/status.py @@ -621,6 +621,49 @@ def read_runtime_status() -> Optional[dict[str, Any]]: return _read_json_file(_get_runtime_status_path()) +# States in which the gateway is alive and could be asked to drain. Anything +# else (draining already, stopping, stopped, startup_failed, None) is NOT a +# valid begin-drain target. +_DRAINABLE_GATEWAY_STATES = frozenset({"running"}) + + +def derive_gateway_busy( + *, gateway_running: bool, gateway_state: Any, active_agents: Any +) -> bool: + """Whether the gateway is actively processing in-flight turns. + + The contract NAS gates lifecycle actions on. Busy iff the gateway is live + (``gateway_running``), in the ``running`` state, AND at least one agent is + mid-turn (``active_agents > 0``). Degrades to ``False`` whenever liveness + is unknown, the state is anything but ``running``, or the count is + absent/unparseable — i.e. a down or file-absent gateway reads "not busy", + never a spurious "busy". + + NOTE: liveness keys off ``gateway_running`` (a live PID / health probe), + NEVER ``updated_at`` — a healthy idle gateway never advances that timestamp. + """ + if not gateway_running: + return False + if gateway_state not in _DRAINABLE_GATEWAY_STATES: + return False + try: + return int(active_agents) > 0 + except (TypeError, ValueError): + return False + + +def derive_gateway_drainable(*, gateway_running: bool, gateway_state: Any) -> bool: + """Whether the gateway can accept a begin-drain request right now. + + True iff the gateway is live and in the ``running`` state — i.e. not already + draining/stopping/stopped and not in a failed-start state. This is + independent of ``active_agents``: an idle running gateway is drainable (the + drain just completes immediately). Degrades to ``False`` for a down or + non-running gateway. + """ + return bool(gateway_running) and gateway_state in _DRAINABLE_GATEWAY_STATES + + def get_runtime_status_running_pid( runtime: Optional[dict[str, Any]] = None, ) -> Optional[int]: diff --git a/hermes_cli/web_server.py b/hermes_cli/web_server.py index 398e61772f0..487ba7a3538 100644 --- a/hermes_cli/web_server.py +++ b/hermes_cli/web_server.py @@ -69,6 +69,8 @@ from hermes_cli.memory_providers import ( get_memory_provider, ) from gateway.status import ( + derive_gateway_busy, + derive_gateway_drainable, get_running_pid, get_runtime_status_running_pid, read_runtime_status, @@ -1835,6 +1837,42 @@ async def get_status(profile: Optional[str] = None): except Exception: pass + # Busy/drainable readout (NAS lifecycle-safety gate). active_agents is + # the in-flight gateway-turn count the gateway now persists at every + # turn boundary; gateway_busy/gateway_drainable are derived from it + + # liveness via the single shared contract in gateway.status. Liveness + # keys off gateway_running (a live PID/health probe), NEVER + # gateway_updated_at — a healthy idle gateway never advances that. + active_agents = 0 + if runtime: + try: + active_agents = max(0, int(runtime.get("active_agents", 0) or 0)) + except (TypeError, ValueError): + active_agents = 0 + gateway_busy = derive_gateway_busy( + gateway_running=gateway_running, + gateway_state=gateway_state, + active_agents=active_agents, + ) + gateway_drainable = derive_gateway_drainable( + gateway_running=gateway_running, + gateway_state=gateway_state, + ) + # Resolved drain timeout (seconds) so NAS can size its poll deadline + # without out-of-band knowledge. Mirrors gateway/restart.py precedence: + # HERMES_RESTART_DRAIN_TIMEOUT env override → config agent.* → default. + from gateway.restart import parse_restart_drain_timeout + + _drain_timeout_raw = os.environ.get("HERMES_RESTART_DRAIN_TIMEOUT") + if _drain_timeout_raw is None: + try: + _drain_timeout_raw = cfg_get( + load_config(), "agent", "restart_drain_timeout", default=None + ) + except Exception: + _drain_timeout_raw = None + restart_drain_timeout = parse_restart_drain_timeout(_drain_timeout_raw) + # Dashboard auth gate (Phase 7): surface whether the gate is engaged # and which providers are registered so ``hermes status`` and the # SPA's StatusPage can show "OAuth gate ON via Nous Research" or @@ -1863,6 +1901,10 @@ async def get_status(profile: Optional[str] = None): "gateway_platforms": gateway_platforms, "gateway_exit_reason": gateway_exit_reason, "gateway_updated_at": gateway_updated_at, + "active_agents": active_agents, + "gateway_busy": gateway_busy, + "gateway_drainable": gateway_drainable, + "restart_drain_timeout": restart_drain_timeout, "active_sessions": active_sessions, "auth_required": auth_required, "auth_providers": auth_providers, diff --git a/tests/gateway/test_api_server.py b/tests/gateway/test_api_server.py index ac5e29c4d3c..6588a70fa7a 100644 --- a/tests/gateway/test_api_server.py +++ b/tests/gateway/test_api_server.py @@ -584,6 +584,10 @@ class TestHealthDetailedEndpoint: assert data["gateway_state"] == "running" assert data["platforms"] == {"telegram": {"state": "connected"}} assert data["active_agents"] == 2 + # Derived busy/drainable: this endpoint is served BY the live + # gateway, so running + 2 agents ⇒ busy and drainable. + assert data["gateway_busy"] is True + assert data["gateway_drainable"] is True assert isinstance(data["pid"], int) assert "updated_at" in data @@ -599,6 +603,9 @@ class TestHealthDetailedEndpoint: assert data["status"] == "ok" assert data["gateway_state"] is None assert data["platforms"] == {} + # No runtime file ⇒ state None ⇒ not busy, not drainable. + assert data["gateway_busy"] is False + assert data["gateway_drainable"] is False @pytest.mark.asyncio async def test_health_detailed_does_not_require_auth(self, auth_adapter): diff --git a/tests/gateway/test_status.py b/tests/gateway/test_status.py index 6cfc1dbf752..22f92c81ef4 100644 --- a/tests/gateway/test_status.py +++ b/tests/gateway/test_status.py @@ -1132,4 +1132,50 @@ class TestActiveAgentsTurnBoundaryWrite: monkeypatch.setenv("HERMES_HOME", str(tmp_path)) status.write_runtime_status(gateway_state="running", active_agents=-5) assert status.read_runtime_status()["active_agents"] == 0 +class TestGatewayBusyDerivation: + """Pure contract for derive_gateway_busy / derive_gateway_drainable — the + single shared definition both /api/status and /health/detailed consume.""" + def test_busy_requires_running_state_and_positive_count(self): + assert status.derive_gateway_busy( + gateway_running=True, gateway_state="running", active_agents=1 + ) is True + assert status.derive_gateway_busy( + gateway_running=True, gateway_state="running", active_agents=0 + ) is False + + def test_busy_false_when_not_live_even_if_file_says_active(self): + # Liveness wins: gateway_running False ⇒ never busy, regardless of count. + assert status.derive_gateway_busy( + gateway_running=False, gateway_state="running", active_agents=9 + ) is False + + def test_busy_false_for_non_running_states(self): + for state in ("draining", "stopping", "stopped", "startup_failed", None): + assert status.derive_gateway_busy( + gateway_running=True, gateway_state=state, active_agents=5 + ) is False, state + + def test_busy_degrades_on_unparseable_count(self): + for bad in (None, "garbage", object()): + assert status.derive_gateway_busy( + gateway_running=True, gateway_state="running", active_agents=bad + ) is False + + def test_drainable_is_running_and_live_independent_of_count(self): + # Idle running gateway is drainable but NOT busy. + assert status.derive_gateway_drainable( + gateway_running=True, gateway_state="running" + ) is True + assert status.derive_gateway_busy( + gateway_running=True, gateway_state="running", active_agents=0 + ) is False + + def test_drainable_false_when_down_or_not_running(self): + assert status.derive_gateway_drainable( + gateway_running=False, gateway_state="running" + ) is False + for state in ("draining", "stopped", None): + assert status.derive_gateway_drainable( + gateway_running=True, gateway_state=state + ) is False, state diff --git a/tests/hermes_cli/test_web_server.py b/tests/hermes_cli/test_web_server.py index 3ce5582619a..25189cd6af5 100644 --- a/tests/hermes_cli/test_web_server.py +++ b/tests/hermes_cli/test_web_server.py @@ -4271,6 +4271,149 @@ class TestStatusRemoteGateway: assert data["gateway_state"] == "running" +class TestGatewayBusyReadout: + """Tests for the NAS busy/drainable readout on /api/status. + + Behaviour contracts (not snapshots): assert how gateway_busy / gateway_drainable + must RELATE to gateway_running + gateway_state + active_agents, and that every + field degrades to a safe falsy value when the gateway is down or its status + file is absent. Liveness must key off gateway_running, NEVER gateway_updated_at. + """ + + @pytest.fixture(autouse=True) + def _setup_test_client(self): + try: + from starlette.testclient import TestClient + except ImportError: + pytest.skip("fastapi/starlette not installed") + + from hermes_cli.web_server import app, _SESSION_HEADER_NAME, _SESSION_TOKEN + self.client = TestClient(app) + self.client.headers[_SESSION_HEADER_NAME] = _SESSION_TOKEN + + def test_busy_when_running_with_active_agents(self, monkeypatch): + """gateway_busy is True iff running AND active_agents > 0.""" + import hermes_cli.web_server as ws + + monkeypatch.setattr(ws, "get_running_pid", lambda: 1234) + monkeypatch.setattr(ws, "read_runtime_status", lambda: { + "gateway_state": "running", + "platforms": {}, + "active_agents": 2, + # A deliberately stale timestamp: busy must NOT depend on it. + "updated_at": "2020-01-01T00:00:00+00:00", + }) + + data = self.client.get("/api/status").json() + assert data["active_agents"] == 2 + assert data["gateway_busy"] is True + assert data["gateway_drainable"] is True + + def test_idle_running_is_drainable_but_not_busy(self, monkeypatch): + """A running gateway with zero in-flight turns is drainable, not busy.""" + import hermes_cli.web_server as ws + + monkeypatch.setattr(ws, "get_running_pid", lambda: 1234) + monkeypatch.setattr(ws, "read_runtime_status", lambda: { + "gateway_state": "running", + "platforms": {}, + "active_agents": 0, + }) + + data = self.client.get("/api/status").json() + assert data["active_agents"] == 0 + assert data["gateway_busy"] is False + assert data["gateway_drainable"] is True + + def test_draining_state_is_neither_busy_nor_drainable(self, monkeypatch): + """While draining, the gateway is not a fresh begin-drain target, and + busy is False even with a stale active_agents>0 in the file — the state + gate dominates.""" + import hermes_cli.web_server as ws + + monkeypatch.setattr(ws, "get_running_pid", lambda: 1234) + monkeypatch.setattr(ws, "read_runtime_status", lambda: { + "gateway_state": "draining", + "platforms": {}, + "active_agents": 3, + }) + + data = self.client.get("/api/status").json() + assert data["gateway_busy"] is False + assert data["gateway_drainable"] is False + + def test_down_gateway_degrades_to_safe_falsy(self, monkeypatch): + """Gateway down (no PID, no remote probe): busy/drainable False, + active_agents 0 — never a spurious busy that would wedge NAS.""" + import hermes_cli.web_server as ws + + monkeypatch.setattr(ws, "get_running_pid", lambda: None) + monkeypatch.setattr(ws, "read_runtime_status", lambda: None) + monkeypatch.setattr(ws, "_GATEWAY_HEALTH_URL", None) + + data = self.client.get("/api/status").json() + assert data["gateway_running"] is False + assert data["active_agents"] == 0 + assert data["gateway_busy"] is False + assert data["gateway_drainable"] is False + + def test_down_gateway_with_stale_busy_file_still_not_busy(self, monkeypatch): + """A leftover status file claiming running + active_agents>0 must NOT + read as busy when the live PID probe says the gateway is down. Liveness + wins over the file.""" + import hermes_cli.web_server as ws + + monkeypatch.setattr(ws, "get_running_pid", lambda: None) + monkeypatch.setattr(ws, "_GATEWAY_HEALTH_URL", None) + # File says running with active turns, but get_running_pid()==None and + # get_runtime_status_running_pid finds no live PID → gateway_running False. + monkeypatch.setattr(ws, "get_runtime_status_running_pid", lambda *_a, **_k: None) + monkeypatch.setattr(ws, "read_runtime_status", lambda: { + "gateway_state": "running", + "platforms": {}, + "active_agents": 5, + }) + + data = self.client.get("/api/status").json() + assert data["gateway_running"] is False + assert data["gateway_busy"] is False + assert data["gateway_drainable"] is False + + def test_restart_drain_timeout_surfaced_and_numeric(self, monkeypatch): + """restart_drain_timeout is present and resolves to a non-negative + float so NAS can size its poll deadline without out-of-band knowledge.""" + import hermes_cli.web_server as ws + + monkeypatch.setattr(ws, "get_running_pid", lambda: 1234) + monkeypatch.setattr(ws, "read_runtime_status", lambda: { + "gateway_state": "running", + "platforms": {}, + "active_agents": 0, + }) + monkeypatch.setenv("HERMES_RESTART_DRAIN_TIMEOUT", "90") + + data = self.client.get("/api/status").json() + assert "restart_drain_timeout" in data + assert isinstance(data["restart_drain_timeout"], (int, float)) + assert data["restart_drain_timeout"] == 90.0 + + def test_active_agents_unparseable_in_file_degrades_to_zero(self, monkeypatch): + """A corrupt active_agents value in the status file must not 500 or + produce a spurious busy — it degrades to 0/not-busy.""" + import hermes_cli.web_server as ws + + monkeypatch.setattr(ws, "get_running_pid", lambda: 1234) + monkeypatch.setattr(ws, "read_runtime_status", lambda: { + "gateway_state": "running", + "platforms": {}, + "active_agents": "garbage", + }) + + data = self.client.get("/api/status").json() + assert data["active_agents"] == 0 + assert data["gateway_busy"] is False + + # --------------------------------------------------------------------------- # Dashboard theme normaliser tests # --------------------------------------------------------------------------- From b577f25100c64d438cc90c78376ebcbde937950f Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Sun, 21 Jun 2026 16:37:42 +0530 Subject: [PATCH 3/4] refactor(gateway): dedupe drain-timeout resolution + share active_agents parse Follow-up cleanups on top of the busy/idle readout (PR #50103): - web_server.py /api/status reused the single drain-timeout resolver hermes_cli.gateway._get_restart_drain_timeout() (HERMES_RESTART_DRAIN_TIMEOUT env -> agent.restart_drain_timeout config -> default) instead of inlining a third hand-rolled copy of that precedence chain. Also fixes a subtle divergence: the inline copy used os.environ.get() so a set-but-empty env var was treated as a value rather than falling through to config; the shared resolver .strip()s and falls through correctly. - Added gateway.status.parse_active_agents() and routed BOTH HTTP surfaces (/api/status and /health/detailed) through it, so the exposed active_agents field is consistently clamped non-negative. Previously /api/status clamped while /health/detailed exposed the raw file value, diverging on a corrupt count. - Added TestParseActiveAgents covering the shared coercion contract. --- gateway/platforms/api_server.py | 3 ++- gateway/status.py | 15 +++++++++++++++ hermes_cli/web_server.py | 28 ++++++++++------------------ tests/gateway/test_status.py | 28 ++++++++++++++++++++++++++++ 4 files changed, 55 insertions(+), 19 deletions(-) diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index 8d67aec85c4..aa968dcb98c 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -1106,12 +1106,13 @@ class APIServerAdapter(BasePlatformAdapter): from gateway.status import ( derive_gateway_busy, derive_gateway_drainable, + parse_active_agents, read_runtime_status, ) runtime = read_runtime_status() or {} gw_state = runtime.get("gateway_state") - gw_active = runtime.get("active_agents", 0) + gw_active = parse_active_agents(runtime.get("active_agents", 0)) # This endpoint is served BY the gateway process, so it is by definition # alive — gateway_running is True. Derive busy/drainable from the same # shared contract /api/status uses so the two surfaces never disagree. diff --git a/gateway/status.py b/gateway/status.py index d5f956a6cd6..b925571c96d 100644 --- a/gateway/status.py +++ b/gateway/status.py @@ -621,6 +621,21 @@ def read_runtime_status() -> Optional[dict[str, Any]]: return _read_json_file(_get_runtime_status_path()) +def parse_active_agents(raw: Any) -> int: + """Coerce a persisted ``active_agents`` value to a clamped non-negative int. + + The status file is written atomically but can still hold an + absent/None/garbage ``active_agents`` after a partial write or a manual + edit. Both HTTP surfaces (``/api/status`` and ``/health/detailed``) read it + through this single helper so the field they expose is consistent and never + negative. Mirrors the write-side clamp in ``write_runtime_status``. + """ + try: + return max(0, int(raw)) + except (TypeError, ValueError): + return 0 + + # States in which the gateway is alive and could be asked to drain. Anything # else (draining already, stopping, stopped, startup_failed, None) is NOT a # valid begin-drain target. diff --git a/hermes_cli/web_server.py b/hermes_cli/web_server.py index 487ba7a3538..8e1e0e72124 100644 --- a/hermes_cli/web_server.py +++ b/hermes_cli/web_server.py @@ -73,6 +73,7 @@ from gateway.status import ( derive_gateway_drainable, get_running_pid, get_runtime_status_running_pid, + parse_active_agents, read_runtime_status, ) from utils import env_var_enabled @@ -1843,12 +1844,7 @@ async def get_status(profile: Optional[str] = None): # liveness via the single shared contract in gateway.status. Liveness # keys off gateway_running (a live PID/health probe), NEVER # gateway_updated_at — a healthy idle gateway never advances that. - active_agents = 0 - if runtime: - try: - active_agents = max(0, int(runtime.get("active_agents", 0) or 0)) - except (TypeError, ValueError): - active_agents = 0 + active_agents = parse_active_agents(runtime.get("active_agents", 0)) if runtime else 0 gateway_busy = derive_gateway_busy( gateway_running=gateway_running, gateway_state=gateway_state, @@ -1859,19 +1855,15 @@ async def get_status(profile: Optional[str] = None): gateway_state=gateway_state, ) # Resolved drain timeout (seconds) so NAS can size its poll deadline - # without out-of-band knowledge. Mirrors gateway/restart.py precedence: - # HERMES_RESTART_DRAIN_TIMEOUT env override → config agent.* → default. - from gateway.restart import parse_restart_drain_timeout + # without out-of-band knowledge. Reuse the single resolver + # (HERMES_RESTART_DRAIN_TIMEOUT env → config agent.restart_drain_timeout + # → default) rather than re-deriving the precedence chain here. + try: + from hermes_cli.gateway import _get_restart_drain_timeout - _drain_timeout_raw = os.environ.get("HERMES_RESTART_DRAIN_TIMEOUT") - if _drain_timeout_raw is None: - try: - _drain_timeout_raw = cfg_get( - load_config(), "agent", "restart_drain_timeout", default=None - ) - except Exception: - _drain_timeout_raw = None - restart_drain_timeout = parse_restart_drain_timeout(_drain_timeout_raw) + restart_drain_timeout = _get_restart_drain_timeout() + except Exception: + restart_drain_timeout = None # Dashboard auth gate (Phase 7): surface whether the gate is engaged # and which providers are registered so ``hermes status`` and the diff --git a/tests/gateway/test_status.py b/tests/gateway/test_status.py index 22f92c81ef4..63f90fe3332 100644 --- a/tests/gateway/test_status.py +++ b/tests/gateway/test_status.py @@ -1093,6 +1093,34 @@ class TestCorruptStatusFiles: assert status._read_pid_record(p) == {"pid": 4242} +class TestParseActiveAgents: + """The shared read-side coercion used by BOTH HTTP surfaces (/api/status + and /health/detailed) so the exposed active_agents field is consistent and + never negative regardless of what the status file holds.""" + + def test_valid_int_passthrough(self): + assert status.parse_active_agents(3) == 3 + + def test_zero(self): + assert status.parse_active_agents(0) == 0 + + def test_numeric_string_coerced(self): + assert status.parse_active_agents("5") == 5 + + def test_negative_clamped_to_zero(self): + assert status.parse_active_agents(-3) == 0 + + def test_none_degrades_to_zero(self): + assert status.parse_active_agents(None) == 0 + + def test_garbage_string_degrades_to_zero(self): + assert status.parse_active_agents("garbage") == 0 + + def test_float_truncates(self): + # int() truncation, then clamp — never raises. + assert status.parse_active_agents(2.9) == 2 + + class TestActiveAgentsTurnBoundaryWrite: """The load-bearing Phase 1a contract: writing the in-flight count at a turn boundary must PRESERVE the lifecycle gateway_state. The whole readout From 4d7bb382b08d1d3b6a3e70869a6ffcc143efebde Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Sun, 21 Jun 2026 16:43:13 +0530 Subject: [PATCH 4/4] refactor(gateway): route all active_agents coercion through parse_active_agents; harden drain-timeout fallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Second cleanup pass (simplify-code review of the first follow-up): - write_runtime_status now clamps active_agents via parse_active_agents instead of an inline max(0, int(...)). Removes the duplicated clamp the helper's docstring acknowledged AND closes a write-side ValueError gap (a non-numeric active_agents previously raised; now degrades to 0). - hermes_cli/gateway.py draining-status line routes its active-agents count through parse_active_agents too — the third coercion site of the same persisted field, now consistent and non-raising with the two HTTP surfaces. - web_server.py /api/status: the drain-timeout resolver fallback now catches ImportError specifically and falls back to DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT (a real float) instead of a blanket 'except Exception -> None'. None would have violated the surfaced field's int/float contract and stripped NAS's poll-deadline hint silently. - Dropped a redundant 'if runtime else 0' branch (parse_active_agents already handles the empty/None case) and tightened the parse_active_agents docstring to describe the actual single-contract role (write + both reads). --- gateway/status.py | 12 ++++++------ hermes_cli/gateway.py | 4 +++- hermes_cli/web_server.py | 10 +++++++--- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/gateway/status.py b/gateway/status.py index b925571c96d..c13752af171 100644 --- a/gateway/status.py +++ b/gateway/status.py @@ -595,7 +595,7 @@ def write_runtime_status( if restart_requested is not _UNSET: payload["restart_requested"] = bool(restart_requested) if active_agents is not _UNSET: - payload["active_agents"] = max(0, int(active_agents)) + payload["active_agents"] = parse_active_agents(active_agents) if served_profiles is not _UNSET: # Profiles this gateway multiplexes (multi-profile mode). Absent/empty # for a single-profile gateway. Lets `hermes status` show per-profile @@ -624,11 +624,11 @@ def read_runtime_status() -> Optional[dict[str, Any]]: def parse_active_agents(raw: Any) -> int: """Coerce a persisted ``active_agents`` value to a clamped non-negative int. - The status file is written atomically but can still hold an - absent/None/garbage ``active_agents`` after a partial write or a manual - edit. Both HTTP surfaces (``/api/status`` and ``/health/detailed``) read it - through this single helper so the field they expose is consistent and never - negative. Mirrors the write-side clamp in ``write_runtime_status``. + The shared coercion for the in-flight gateway-turn count. Used on the WRITE + side (``write_runtime_status``) and by both HTTP read surfaces + (``/api/status`` and ``/health/detailed``) so the count is clamped to a + single contract — never negative, never raising on a manually-edited or + otherwise non-numeric value (degrades to ``0``). """ try: return max(0, int(raw)) diff --git a/hermes_cli/gateway.py b/hermes_cli/gateway.py index cf65af98c40..34f7b96a984 100644 --- a/hermes_cli/gateway.py +++ b/hermes_cli/gateway.py @@ -4573,7 +4573,9 @@ def _runtime_health_lines() -> list[str]: lines.append(f"⚠ Last startup issue: {exit_reason}") elif gateway_state == "draining": action = "restart" if restart_requested else "shutdown" - count = int(active_agents or 0) + from gateway.status import parse_active_agents + + count = parse_active_agents(active_agents) lines.append(f"⏳ Gateway draining for {action} ({count} active agent(s))") elif gateway_state == "stopped" and exit_reason: lines.append(f"⚠ Last shutdown reason: {exit_reason}") diff --git a/hermes_cli/web_server.py b/hermes_cli/web_server.py index 8e1e0e72124..74ea8182533 100644 --- a/hermes_cli/web_server.py +++ b/hermes_cli/web_server.py @@ -1844,7 +1844,7 @@ async def get_status(profile: Optional[str] = None): # liveness via the single shared contract in gateway.status. Liveness # keys off gateway_running (a live PID/health probe), NEVER # gateway_updated_at — a healthy idle gateway never advances that. - active_agents = parse_active_agents(runtime.get("active_agents", 0)) if runtime else 0 + active_agents = parse_active_agents((runtime or {}).get("active_agents", 0)) gateway_busy = derive_gateway_busy( gateway_running=gateway_running, gateway_state=gateway_state, @@ -1862,8 +1862,12 @@ async def get_status(profile: Optional[str] = None): from hermes_cli.gateway import _get_restart_drain_timeout restart_drain_timeout = _get_restart_drain_timeout() - except Exception: - restart_drain_timeout = None + except ImportError: + # Resolver moved/renamed — fall back to the real default so the + # field stays a numeric poll-deadline hint, never None. + from gateway.restart import DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT + + restart_drain_timeout = DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT # Dashboard auth gate (Phase 7): surface whether the gate is engaged # and which providers are registered so ``hermes status`` and the