From d1cac0e5ef835eef691f33ebdc470905a78cfb38 Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 25 Jun 2026 09:51:02 +1000 Subject: [PATCH] feat(gateway): scale-to-zero idle detection + dormant-quiesce (Phase 0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The gateway-side BEHAVIOUR layer that consumes the relay scale-to-zero primitives (gateway-gateway Phase 5): the gateway decides it is idle and drives the relay transport dormant so the platform (Fly autostop:"suspend") can suspend the now-traffic-idle machine, which wakes on the connector's wakeUrl poke (decisions.md Q3=C', D1-D13). - gateway/scale_to_zero.py: pure helpers — scale_to_zero_enabled (the NAS Labs HERMES_SCALE_TO_ZERO stamp, D11/Q8=A), parse_idle_timeout_seconds (config.yaml gateway.scale_to_zero.idle_timeout_minutes, D2), messaging_is_relay_only_or_absent (F6/D1), should_arm (D1/D11/§3.4(1)), is_idle (D2/D3/F7). - gateway/run.py: _last_inbound_at clock stamped on user inbound in _handle_message (F13); the arm-gate + idle predicate + the _scale_to_zero_watcher dormant sequence (mark draining -> adapter go_dormant() -> cooldown), started only when armed. Deliberately NOT the stop path and NOT mark_resume_pending (F12/D13). - tools/process_registry.py: has_any_active() for the bg-work guard (D3/F7). - hermes_cli/config.py: gateway.scale_to_zero.idle_timeout_minutes default 5. Tests: 38 pure-logic + 6 watcher (incl. bg-work regression guard proven RED). Full relay + scale-to-zero suites: 184 passed. The 20 unrelated failures in the broader run are PRE-EXISTING on origin/main (custom-provider/tools tests), confirmed via a pristine baseline worktree. --- gateway/run.py | 174 ++++++++++++++++++++ gateway/scale_to_zero.py | 124 ++++++++++++++ hermes_cli/config.py | 12 ++ tests/gateway/test_scale_to_zero.py | 144 ++++++++++++++++ tests/gateway/test_scale_to_zero_watcher.py | 120 ++++++++++++++ tools/process_registry.py | 17 ++ 6 files changed, 591 insertions(+) create mode 100644 gateway/scale_to_zero.py create mode 100644 tests/gateway/test_scale_to_zero.py create mode 100644 tests/gateway/test_scale_to_zero_watcher.py diff --git a/gateway/run.py b/gateway/run.py index 4b285287b22..bcee403b1ce 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -2783,6 +2783,17 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew # Track background tasks to prevent garbage collection mid-execution self._background_tasks: set = set() + # scale-to-zero (Phase 0, F13): gateway-scoped "last inbound seen" clock. + # There is no such clock today (only a per-agent _last_activity_ts), so the + # idle predicate needs this. Stamped in _handle_message (the single inbound + # chokepoint all adapters call); seeded to "now" so a fresh gateway isn't + # considered idle from epoch. The scale-to-zero watcher (started only when + # the instance is opted in + relay-only + has a wakeUrl) reads it. + self._last_inbound_at: float = time.time() + # Set after a wake (re-arm cooldown, 0.F) so we don't immediately re-go + # dormant before the drained backlog has a chance to update the clock. + self._scale_to_zero_cooldown_until: float = 0.0 + def _wire_teams_pipeline_runtime(self) -> None: """Bind the Teams meeting pipeline runtime to Graph webhook ingress. @@ -3568,6 +3579,145 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew def _running_agent_count(self) -> int: return len(self._running_agents) + # ── scale-to-zero idle detection / dormant-quiesce (Phase 0) ────────────── + # The gateway-side BEHAVIOUR that consumes the relay scale-to-zero primitives + # (gateway-gateway Phase 5). Pure logic lives in gateway/scale_to_zero.py; the + # methods here bind it to the live runner/transport. See ~/nous/specs/ + # scale-to-zero (decisions.md) for the design + the F12/F14 distinctions. + + def _scale_to_zero_has_live_background_work(self) -> bool: + """Live background work that must block a suspend (D3/F7). + + Backgrounded delegate_task / kanban / terminal(background=true) are NOT + counted by _running_agent_count(), but suspending mid-flight loses them. + Checks the runner's own tracked tasks + the process registry's running + processes + any pending process-completion watchers. + """ + if any(not t.done() for t in self._background_tasks): + return True + try: + from tools.process_registry import process_registry + + if process_registry.has_any_active(): + return True + if process_registry.pending_watchers: + return True + except Exception: # noqa: BLE001 - never let the idle check raise + logger.debug("scale-to-zero bg-work check failed", exc_info=True) + return False + + def _scale_to_zero_idle_timeout_seconds(self) -> float: + from gateway.scale_to_zero import parse_idle_timeout_seconds + + raw = None + try: + user_cfg = _load_gateway_config() + gw = user_cfg.get("gateway") if isinstance(user_cfg, dict) else None + stz = gw.get("scale_to_zero") if isinstance(gw, dict) else None + if isinstance(stz, dict): + raw = stz.get("idle_timeout_minutes") + except Exception: # noqa: BLE001 + raw = None + return parse_idle_timeout_seconds(raw) + + def _scale_to_zero_should_arm(self) -> bool: + """Whether to start the idle watcher (D1/D11/§3.4(1)).""" + from gateway.relay import relay_wake_url + from gateway.scale_to_zero import ( + messaging_is_relay_only_or_absent, + scale_to_zero_enabled, + should_arm, + ) + + try: + platforms = list(self.config.platforms.keys()) if self.config else [] + except Exception: # noqa: BLE001 + platforms = [] + try: + wake_url = relay_wake_url() + except Exception: # noqa: BLE001 + wake_url = None + return should_arm( + enabled=scale_to_zero_enabled(), + relay_only_or_absent=messaging_is_relay_only_or_absent(platforms), + wake_url=wake_url, + ) + + def _scale_to_zero_is_idle(self) -> bool: + from gateway.scale_to_zero import is_idle + + return is_idle( + running_agent_count=self._running_agent_count(), + seconds_since_last_inbound=time.time() - self._last_inbound_at, + idle_timeout_seconds=self._scale_to_zero_idle_timeout_seconds(), + has_live_background_work=self._scale_to_zero_has_live_background_work(), + ) + + def _relay_adapter_for_dormancy(self): + """Return the connected RELAY adapter, if any (the one go_dormant targets).""" + try: + from gateway.platforms.base import Platform + except Exception: # noqa: BLE001 + return None + return self.adapters.get(Platform.RELAY) + + async def _scale_to_zero_watcher(self, interval: float = 30.0) -> None: + """Watch for idle and drive the relay dormant so the platform can suspend. + + Started ONLY when _scale_to_zero_should_arm() (opted in via the Labs + HERMES_SCALE_TO_ZERO stamp + relay-only/absent messaging + a wakeUrl). + On a sustained idle window it runs the DORMANT sequence (D12/F12/F14): + - mark runtime status `draining` (composes with the existing state + machine, §3.4(6); does NOT set _running=False), + - relay adapter.go_dormant() — going_idle->ack + supervisor-preserving + socket close (NOT disconnect(), NOT the run.py stop path), + - deliberately NO mark_resume_pending (D13 — suspend preserves RAM). + The process stays alive; the platform (Fly autostop:"suspend") suspends + the now-traffic-idle machine and autostart wakes it on the wakeUrl poke, + at which point the preserved reconnect supervisor re-dials and the + connector drains the buffered backlog. After driving dormant we set a + re-arm cooldown so a wake's drained backlog isn't immediately re-quiesced. + """ + await asyncio.sleep(min(interval, 30.0)) # let startup settle + while self._running: + try: + await asyncio.sleep(interval) + if not self._running: + return + if time.time() < self._scale_to_zero_cooldown_until: + continue + if not self._scale_to_zero_is_idle(): + continue + adapter = self._relay_adapter_for_dormancy() + if adapter is None: + continue + go_dormant = getattr(adapter, "go_dormant", None) + if not callable(go_dormant): + continue + logger.info( + "scale-to-zero: gateway idle for >= %.0fs — going dormant " + "(relay buffered, socket closed, awaiting platform suspend)", + self._scale_to_zero_idle_timeout_seconds(), + ) + try: + self._update_runtime_status("draining") + except Exception: # noqa: BLE001 - status is best-effort + logger.debug("scale-to-zero: status mark failed", exc_info=True) + try: + result = go_dormant() + if asyncio.iscoroutine(result): + await result + except Exception: # noqa: BLE001 - dormancy is best-effort + logger.debug("scale-to-zero: go_dormant failed", exc_info=True) + # 0.F: after a wake the drained inbound updates _last_inbound_at, + # but give it a window so we don't immediately re-go-dormant on the + # same idle reading before traffic lands. + self._scale_to_zero_cooldown_until = time.time() + max(interval, 60.0) + except asyncio.CancelledError: + raise + except Exception: # noqa: BLE001 - the watcher must never crash the gateway + logger.debug("scale-to-zero watcher iteration error", exc_info=True) + def _status_action_label(self) -> str: return "restart" if self._restart_requested else "shutdown" @@ -5965,6 +6115,22 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew # idle case where the subagent finishes with no agent turn running. asyncio.create_task(self._async_delegation_watcher()) + # Start the scale-to-zero idle watcher ONLY when this instance is opted + # in (the NAS "Labs" HERMES_SCALE_TO_ZERO stamp), messaging is + # relay-only/absent, and a wakeUrl is registered (decisions.md D1/D11/ + # §3.4(1)). A non-opted instance never starts it, so behaviour is exactly + # as today. When armed, the watcher drives the relay dormant on sustained + # idle so the platform (Fly autostop:"suspend") can suspend the machine. + try: + if self._scale_to_zero_should_arm(): + logger.info( + "scale-to-zero: armed (idle timeout %.0fs) — watching for idle", + self._scale_to_zero_idle_timeout_seconds(), + ) + asyncio.create_task(self._scale_to_zero_watcher()) + except Exception: # noqa: BLE001 - arming must never block startup + logger.debug("scale-to-zero: arm check failed at startup", exc_info=True) + logger.info("Press Ctrl+C to stop") return True @@ -7332,6 +7498,14 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew # are system-generated and must skip user authorization. is_internal = bool(getattr(event, "internal", False)) + # scale-to-zero (Phase 0, 0.B/F13): stamp the gateway-scoped last-inbound + # clock for real (user-originated) inbound only. Internal/system events + # (background-process completions, startup-restore replays) are NOT + # traffic — counting them would keep a genuinely idle gateway awake. This + # clock is what the idle predicate (gateway/scale_to_zero.is_idle) reads. + if not is_internal: + self._last_inbound_at = time.time() + # Fire pre_gateway_dispatch plugin hook for user-originated messages. # Plugins receive the MessageEvent and may return a dict influencing flow: # {"action": "skip", "reason": ...} -> drop (no reply, plugin handled) diff --git a/gateway/scale_to_zero.py b/gateway/scale_to_zero.py new file mode 100644 index 00000000000..107c45d764e --- /dev/null +++ b/gateway/scale_to_zero.py @@ -0,0 +1,124 @@ +"""Scale-to-zero idle detection + dormant-quiesce for the gateway (Phase 0). + +This is the gateway-side BEHAVIOUR layer that consumes the relay scale-to-zero +PRIMITIVES (gateway-gateway Phase 5: the buffered-flip, the durable per-instance +buffer, the wakeUrl poke, the reconnect supervisor). It owns the *decision* to go +idle and drives the relay transport's ``go_dormant()`` (D12) — it does NOT itself +suspend the machine. On Fly, the now-traffic-idle machine is suspended by +``autostop:"suspend"`` and woken by autostart-on-wakeUrl (decisions.md Q3=C′). + +Design constraints (decisions.md): + - Per-instance enable is gated SOLELY by the NAS "Labs" toggle, carried to the + gateway as the ``HERMES_SCALE_TO_ZERO`` env stamp (D11/Q8=A). NOT a user + config key; ``scale_to_zero.idle_timeout_minutes`` IS config.yaml (D2). + - Arm only when messaging is relay-only or absent (D1/F6) AND a wakeUrl is + registered (§3.4(1)) AND the flag is set. + - Idle = no in-flight agent turn AND no inbound for N min AND no live + background work (D2/D3/F7). + - The quiesce uses ``go_dormant()`` (socket closed + supervisor preserved), + NEVER the stop/restart drain or ``disconnect()`` (F12/F14). The process stays + alive; Fly freezes+resumes it. + - ``mark_resume_pending`` is deliberately NOT called here (D13 — suspend + preserves RAM; revive only if we move to autostop:"stop" or see kills). + +The pure helpers (``parse_idle_timeout_seconds``, ``scale_to_zero_enabled``, +``messaging_is_relay_only_or_absent``, ``is_idle``, ``should_arm``) take plain +inputs so they unit-test without a live gateway. +""" + +from __future__ import annotations + +import os +from typing import Any, Iterable, Optional + +# Env flag stamped by NAS when the scaleToZero Labs toggle is on (D11/Q8=A), +# mirroring how the `relay` feature stamps GATEWAY_RELAY_URL. Truthy values only. +SCALE_TO_ZERO_ENV = "HERMES_SCALE_TO_ZERO" + +# config.yaml default (D2). Behavioural setting -> config, not env. +DEFAULT_IDLE_TIMEOUT_MINUTES = 5 + +_TRUTHY = {"1", "true", "yes", "on"} + + +def scale_to_zero_enabled(environ: Optional[dict] = None) -> bool: + """Whether the per-instance Labs toggle is on (the HERMES_SCALE_TO_ZERO stamp). + + D11/Q8=A: this env flag is the SOLE per-instance enable signal reaching the + gateway. Absent/blank/falsey -> disabled (fail-safe default off). + """ + env = environ if environ is not None else os.environ + return str(env.get(SCALE_TO_ZERO_ENV, "")).strip().lower() in _TRUTHY + + +def parse_idle_timeout_seconds( + cfg_value: Any, default_minutes: int = DEFAULT_IDLE_TIMEOUT_MINUTES +) -> float: + """Coerce ``scale_to_zero.idle_timeout_minutes`` (config.yaml, D2) to seconds. + + Degrades to the default on any non-numeric / non-positive value (never raises, + never returns <= 0 — a zero/negative timeout would make the gateway go dormant + instantly, which is never the intent). + """ + try: + minutes = float(cfg_value) + except (TypeError, ValueError): + minutes = float(default_minutes) + if minutes <= 0: + minutes = float(default_minutes) + return minutes * 60.0 + + +def messaging_is_relay_only_or_absent(platforms: Iterable[Any]) -> bool: + """True iff the only connected messaging platform is RELAY, or there is none + (a Chronos-only / no-platform agent) — the F6/D1 structural precondition. + + A directly-connected platform (Discord/Telegram/Slack/...) holds a live + socket and cannot scale to zero, so its presence disarms the feature. We + compare by the platform's ``.value``/name to avoid importing the enum here + (keeps this module import-light and unit-testable). + """ + names = {_platform_name(p) for p in platforms} + names.discard("relay") + return len(names) == 0 + + +def _platform_name(platform: Any) -> str: + value = getattr(platform, "value", platform) + return str(value).strip().lower() + + +def should_arm( + *, + enabled: bool, + relay_only_or_absent: bool, + wake_url: Optional[str], +) -> bool: + """Whether to start the idle watcher at all (D1/D11/§3.4(1)). + + ALL must hold: the Labs flag is on, messaging is relay-only/absent, and a + wakeUrl is registered (a suspended instance with no reachable wake target is + a black hole — §3.4(1)). Any unmet -> the watcher never starts (no idle + timer, no dormancy), so a non-opted instance behaves exactly as today. + """ + return bool(enabled) and bool(relay_only_or_absent) and bool(wake_url) + + +def is_idle( + *, + running_agent_count: int, + seconds_since_last_inbound: float, + idle_timeout_seconds: float, + has_live_background_work: bool, +) -> bool: + """The idle predicate (D2/D3/F7). Pure — composes the three conjuncts. + + Idle iff: no in-flight agent turn, no inbound within the timeout window, and + no live background work (backgrounded delegate_task / kanban / bg terminal). + Any active work keeps the gateway awake — suspending mid-flight would lose it. + """ + if running_agent_count > 0: + return False + if has_live_background_work: + return False + return seconds_since_last_inbound >= idle_timeout_seconds diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 495d7a7a37f..005742e4d53 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -2523,6 +2523,18 @@ DEFAULT_CONFIG = { # Gateway settings — control how messaging platforms (Telegram, Discord, # Slack, etc.) deliver agent-produced files as native attachments. "gateway": { + # Scale-to-zero idle detection (Phase 0). The gateway watches for idle + # and, when an instance is opted in via the NAS "Labs" toggle (carried as + # the HERMES_SCALE_TO_ZERO env stamp) AND messaging is relay-only/absent + # AND a wakeUrl is registered, drives the relay transport dormant so the + # platform (e.g. Fly autostop:"suspend") can suspend the now-idle machine; + # it wakes on the connector's wakeUrl poke. This is the idle TIMEOUT only + # — whether the feature is enabled at all is the Labs toggle, never a + # config key (decisions.md D2/D11). 0/negative falls back to the default. + "scale_to_zero": { + "idle_timeout_minutes": 5, + }, + # Inject a human-readable timestamp prefix (e.g. # "[Tue 2026-04-28 13:40:53 CEST]") onto user messages IN THE MODEL'S # CONTEXT so the agent has temporal awareness of when each message was diff --git a/tests/gateway/test_scale_to_zero.py b/tests/gateway/test_scale_to_zero.py new file mode 100644 index 00000000000..343a07cbc53 --- /dev/null +++ b/tests/gateway/test_scale_to_zero.py @@ -0,0 +1,144 @@ +"""Unit tests for the scale-to-zero idle-detection pure logic (Phase 0). + +Behaviour-contract tests (AGENTS.md): each conjunct of the idle predicate and +each clause of the arm-gate is exercised independently, not frozen against a +snapshot. The pure helpers in gateway/scale_to_zero.py take plain inputs so they +test without a live gateway. +""" + +from __future__ import annotations + +import pytest + +from gateway.scale_to_zero import ( + DEFAULT_IDLE_TIMEOUT_MINUTES, + SCALE_TO_ZERO_ENV, + is_idle, + messaging_is_relay_only_or_absent, + parse_idle_timeout_seconds, + scale_to_zero_enabled, + should_arm, +) + + +# ── scale_to_zero_enabled (the Labs HERMES_SCALE_TO_ZERO stamp, D11/Q8=A) ──── + + +@pytest.mark.parametrize("value", ["1", "true", "TRUE", "yes", "on", " On "]) +def test_enabled_truthy_values(value): + assert scale_to_zero_enabled({SCALE_TO_ZERO_ENV: value}) is True + + +@pytest.mark.parametrize("value", ["", "0", "false", "no", "off", "nope"]) +def test_enabled_falsey_values(value): + assert scale_to_zero_enabled({SCALE_TO_ZERO_ENV: value}) is False + + +def test_enabled_absent_is_false(): + # Fail-safe default OFF when the stamp is absent (a non-opted instance). + assert scale_to_zero_enabled({}) is False + + +# ── parse_idle_timeout_seconds (config.yaml, D2) ───────────────────────────── + + +def test_timeout_parses_minutes_to_seconds(): + assert parse_idle_timeout_seconds(5) == 300.0 + assert parse_idle_timeout_seconds(10) == 600.0 + assert parse_idle_timeout_seconds("5") == 300.0 + + +@pytest.mark.parametrize("bad", [None, "", "abc", {}, [], object()]) +def test_timeout_degrades_to_default_on_garbage(bad): + assert parse_idle_timeout_seconds(bad) == DEFAULT_IDLE_TIMEOUT_MINUTES * 60.0 + + +@pytest.mark.parametrize("nonpos", [0, -1, -30, "0", "-5"]) +def test_timeout_rejects_nonpositive(nonpos): + # A zero/negative timeout would go dormant instantly — never the intent. + assert parse_idle_timeout_seconds(nonpos) == DEFAULT_IDLE_TIMEOUT_MINUTES * 60.0 + + +# ── messaging_is_relay_only_or_absent (F6/D1) ──────────────────────────────── + + +class _P: + """Stand-in for a Platform enum member with a ``.value``.""" + + def __init__(self, value): + self.value = value + + +def test_relay_only_is_true(): + assert messaging_is_relay_only_or_absent([_P("relay")]) is True + + +def test_no_platform_is_true(): + # A Chronos-only / no-messaging-platform agent can scale to zero. + assert messaging_is_relay_only_or_absent([]) is True + + +def test_direct_socket_platform_disarms(): + assert messaging_is_relay_only_or_absent([_P("discord")]) is False + assert messaging_is_relay_only_or_absent([_P("relay"), _P("telegram")]) is False + + +def test_accepts_bare_strings_too(): + assert messaging_is_relay_only_or_absent(["relay"]) is True + assert messaging_is_relay_only_or_absent(["discord"]) is False + + +# ── should_arm (D1/D11/§3.4(1)) ────────────────────────────────────────────── + + +def test_arm_requires_all_three(): + assert should_arm(enabled=True, relay_only_or_absent=True, wake_url="https://x") is True + + +def test_arm_blocked_when_flag_off(): + assert should_arm(enabled=False, relay_only_or_absent=True, wake_url="https://x") is False + + +def test_arm_blocked_when_direct_socket(): + assert should_arm(enabled=True, relay_only_or_absent=False, wake_url="https://x") is False + + +def test_arm_blocked_without_wake_url(): + # A suspended instance with no wake target is a black hole (§3.4(1)). + assert should_arm(enabled=True, relay_only_or_absent=True, wake_url=None) is False + assert should_arm(enabled=True, relay_only_or_absent=True, wake_url="") is False + + +# ── is_idle (D2/D3/F7) — each conjunct flips the result ────────────────────── + + +def _idle_kwargs(**over): + base = dict( + running_agent_count=0, + seconds_since_last_inbound=600.0, + idle_timeout_seconds=300.0, + has_live_background_work=False, + ) + base.update(over) + return base + + +def test_idle_true_when_all_quiet(): + assert is_idle(**_idle_kwargs()) is True + + +def test_not_idle_with_running_agent(): + assert is_idle(**_idle_kwargs(running_agent_count=1)) is False + + +def test_not_idle_within_timeout_window(): + assert is_idle(**_idle_kwargs(seconds_since_last_inbound=120.0)) is False + + +def test_idle_exactly_at_threshold(): + # >= timeout is idle (boundary). + assert is_idle(**_idle_kwargs(seconds_since_last_inbound=300.0)) is True + + +def test_not_idle_with_live_background_work(): + assert is_idle(**_idle_kwargs(has_live_background_work=True)) is False diff --git a/tests/gateway/test_scale_to_zero_watcher.py b/tests/gateway/test_scale_to_zero_watcher.py new file mode 100644 index 00000000000..5048928b285 --- /dev/null +++ b/tests/gateway/test_scale_to_zero_watcher.py @@ -0,0 +1,120 @@ +"""Watcher-level tests for scale-to-zero: the idle watcher's dormant sequence and +the arm-gate wiring, exercised against the real GatewayRunner methods bound onto +a lightweight stand-in (booting a full gateway is unnecessary for this logic and +would be slow/flaky). + +These cover the parts gateway/test_scale_to_zero.py (pure helpers) can't: that +the watcher calls the relay adapter's go_dormant() exactly when idle+armed, +respects the cooldown, and skips when busy — the F7/D3 + D12 behaviour. +""" + +from __future__ import annotations + +import asyncio +import time + +import pytest + +from gateway.run import GatewayRunner + + +class _FakeRelayAdapter: + def __init__(self): + self.go_dormant_calls = 0 + + async def go_dormant(self): + self.go_dormant_calls += 1 + return True + + +def _runner_with(monkeypatch, *, idle, armed_adapter=True): + """Build a GatewayRunner without booting it, stubbing just what the watcher + touches. Real methods (_scale_to_zero_is_idle composition, the watcher body) + run; only their dependencies are stubbed.""" + r = GatewayRunner.__new__(GatewayRunner) + r._running = True + r._scale_to_zero_cooldown_until = 0.0 + r._last_inbound_at = time.time() + r._running_agents = {} + r._background_tasks = set() + adapter = _FakeRelayAdapter() if armed_adapter else None + + monkeypatch.setattr(r, "_scale_to_zero_is_idle", lambda: idle, raising=False) + monkeypatch.setattr(r, "_relay_adapter_for_dormancy", lambda: adapter, raising=False) + monkeypatch.setattr(r, "_scale_to_zero_idle_timeout_seconds", lambda: 300.0, raising=False) + monkeypatch.setattr(r, "_update_runtime_status", lambda *a, **k: None, raising=False) + return r, adapter + + +@pytest.mark.asyncio +async def test_watcher_goes_dormant_when_idle(monkeypatch): + r, adapter = _runner_with(monkeypatch, idle=True) + # Run one iteration: stop after the first sleep so the loop exits cleanly. + task = asyncio.create_task(r._scale_to_zero_watcher(interval=0.01)) + await asyncio.sleep(0.1) + r._running = False + await asyncio.wait_for(task, timeout=2) + assert adapter.go_dormant_calls >= 1 + # After driving dormant, a re-arm cooldown is set (0.F). + assert r._scale_to_zero_cooldown_until > time.time() + + +@pytest.mark.asyncio +async def test_watcher_does_not_go_dormant_when_busy(monkeypatch): + r, adapter = _runner_with(monkeypatch, idle=False) + task = asyncio.create_task(r._scale_to_zero_watcher(interval=0.01)) + await asyncio.sleep(0.1) + r._running = False + await asyncio.wait_for(task, timeout=2) + assert adapter.go_dormant_calls == 0 + + +@pytest.mark.asyncio +async def test_watcher_respects_cooldown(monkeypatch): + r, adapter = _runner_with(monkeypatch, idle=True) + # Cooldown active far in the future: even though idle, no dormancy fires. + r._scale_to_zero_cooldown_until = time.time() + 3600 + task = asyncio.create_task(r._scale_to_zero_watcher(interval=0.01)) + await asyncio.sleep(0.1) + r._running = False + await asyncio.wait_for(task, timeout=2) + assert adapter.go_dormant_calls == 0 + + +@pytest.mark.asyncio +async def test_watcher_noop_when_no_relay_adapter(monkeypatch): + # Armed-but-no-relay-adapter (e.g. relay not yet connected): must not crash. + r, _ = _runner_with(monkeypatch, idle=True, armed_adapter=False) + task = asyncio.create_task(r._scale_to_zero_watcher(interval=0.01)) + await asyncio.sleep(0.1) + r._running = False + await asyncio.wait_for(task, timeout=2) + # No exception, loop exits cleanly — nothing to assert beyond survival. + + +def test_bg_work_blocks_idle_via_background_tasks(monkeypatch): + """_scale_to_zero_has_live_background_work() reports True when a tracked + background task is still live (D3/F7) — the guard that keeps a gateway with + an in-flight backgrounded subagent/terminal awake.""" + r = GatewayRunner.__new__(GatewayRunner) + + async def _never(): + await asyncio.sleep(3600) + + loop = asyncio.new_event_loop() + try: + t = loop.create_task(_never()) + r._background_tasks = {t} + # process_registry has nothing active in this fresh process. + assert r._scale_to_zero_has_live_background_work() is True + t.cancel() + finally: + loop.run_until_complete(asyncio.gather(t, return_exceptions=True)) + loop.close() + + +def test_bg_work_false_when_quiet(): + r = GatewayRunner.__new__(GatewayRunner) + r._background_tasks = set() + # No background tasks, no active processes in this fresh process. + assert r._scale_to_zero_has_live_background_work() is False diff --git a/tools/process_registry.py b/tools/process_registry.py index 1ed658a92f2..69bfc13619d 100644 --- a/tools/process_registry.py +++ b/tools/process_registry.py @@ -1581,6 +1581,23 @@ class ProcessRegistry: for s in self._running.values() ) + def has_any_active(self) -> bool: + """Whether ANY background process is still running (across all sessions). + + Used by scale-to-zero idle detection (gateway/scale_to_zero): a gateway + with a live background process (terminal background=true) is NOT idle and + must not be suspended, or the process is lost. Refreshes detached + sessions first so a finished-but-unreaped process reads as inactive. + """ + with self._lock: + sessions = list(self._running.values()) + + for session in sessions: + self._refresh_detached_session(session) + + with self._lock: + return any(not s.exited for s in self._running.values()) + def kill_all(self, task_id: str = None) -> int: """Kill all running processes, optionally filtered by task_id. Returns count killed.""" with self._lock: