mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-27 11:22:03 +00:00
feat(gateway): scale-to-zero idle detection + dormant-quiesce (Phase 0)
The gateway-side BEHAVIOUR layer that consumes the relay scale-to-zero primitives (gateway-gateway Phase 5): the gateway decides it is idle and drives the relay transport dormant so the platform (Fly autostop:"suspend") can suspend the now-traffic-idle machine, which wakes on the connector's wakeUrl poke (decisions.md Q3=C', D1-D13). - gateway/scale_to_zero.py: pure helpers — scale_to_zero_enabled (the NAS Labs HERMES_SCALE_TO_ZERO stamp, D11/Q8=A), parse_idle_timeout_seconds (config.yaml gateway.scale_to_zero.idle_timeout_minutes, D2), messaging_is_relay_only_or_absent (F6/D1), should_arm (D1/D11/§3.4(1)), is_idle (D2/D3/F7). - gateway/run.py: _last_inbound_at clock stamped on user inbound in _handle_message (F13); the arm-gate + idle predicate + the _scale_to_zero_watcher dormant sequence (mark draining -> adapter go_dormant() -> cooldown), started only when armed. Deliberately NOT the stop path and NOT mark_resume_pending (F12/D13). - tools/process_registry.py: has_any_active() for the bg-work guard (D3/F7). - hermes_cli/config.py: gateway.scale_to_zero.idle_timeout_minutes default 5. Tests: 38 pure-logic + 6 watcher (incl. bg-work regression guard proven RED). Full relay + scale-to-zero suites: 184 passed. The 20 unrelated failures in the broader run are PRE-EXISTING on origin/main (custom-provider/tools tests), confirmed via a pristine baseline worktree.
This commit is contained in:
parent
96af4bec30
commit
d1cac0e5ef
6 changed files with 591 additions and 0 deletions
174
gateway/run.py
174
gateway/run.py
|
|
@ -2783,6 +2783,17 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
# Track background tasks to prevent garbage collection mid-execution
|
||||
self._background_tasks: set = set()
|
||||
|
||||
# scale-to-zero (Phase 0, F13): gateway-scoped "last inbound seen" clock.
|
||||
# There is no such clock today (only a per-agent _last_activity_ts), so the
|
||||
# idle predicate needs this. Stamped in _handle_message (the single inbound
|
||||
# chokepoint all adapters call); seeded to "now" so a fresh gateway isn't
|
||||
# considered idle from epoch. The scale-to-zero watcher (started only when
|
||||
# the instance is opted in + relay-only + has a wakeUrl) reads it.
|
||||
self._last_inbound_at: float = time.time()
|
||||
# Set after a wake (re-arm cooldown, 0.F) so we don't immediately re-go
|
||||
# dormant before the drained backlog has a chance to update the clock.
|
||||
self._scale_to_zero_cooldown_until: float = 0.0
|
||||
|
||||
|
||||
def _wire_teams_pipeline_runtime(self) -> None:
|
||||
"""Bind the Teams meeting pipeline runtime to Graph webhook ingress.
|
||||
|
|
@ -3568,6 +3579,145 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
def _running_agent_count(self) -> int:
|
||||
return len(self._running_agents)
|
||||
|
||||
# ── scale-to-zero idle detection / dormant-quiesce (Phase 0) ──────────────
|
||||
# The gateway-side BEHAVIOUR that consumes the relay scale-to-zero primitives
|
||||
# (gateway-gateway Phase 5). Pure logic lives in gateway/scale_to_zero.py; the
|
||||
# methods here bind it to the live runner/transport. See ~/nous/specs/
|
||||
# scale-to-zero (decisions.md) for the design + the F12/F14 distinctions.
|
||||
|
||||
def _scale_to_zero_has_live_background_work(self) -> bool:
|
||||
"""Live background work that must block a suspend (D3/F7).
|
||||
|
||||
Backgrounded delegate_task / kanban / terminal(background=true) are NOT
|
||||
counted by _running_agent_count(), but suspending mid-flight loses them.
|
||||
Checks the runner's own tracked tasks + the process registry's running
|
||||
processes + any pending process-completion watchers.
|
||||
"""
|
||||
if any(not t.done() for t in self._background_tasks):
|
||||
return True
|
||||
try:
|
||||
from tools.process_registry import process_registry
|
||||
|
||||
if process_registry.has_any_active():
|
||||
return True
|
||||
if process_registry.pending_watchers:
|
||||
return True
|
||||
except Exception: # noqa: BLE001 - never let the idle check raise
|
||||
logger.debug("scale-to-zero bg-work check failed", exc_info=True)
|
||||
return False
|
||||
|
||||
def _scale_to_zero_idle_timeout_seconds(self) -> float:
|
||||
from gateway.scale_to_zero import parse_idle_timeout_seconds
|
||||
|
||||
raw = None
|
||||
try:
|
||||
user_cfg = _load_gateway_config()
|
||||
gw = user_cfg.get("gateway") if isinstance(user_cfg, dict) else None
|
||||
stz = gw.get("scale_to_zero") if isinstance(gw, dict) else None
|
||||
if isinstance(stz, dict):
|
||||
raw = stz.get("idle_timeout_minutes")
|
||||
except Exception: # noqa: BLE001
|
||||
raw = None
|
||||
return parse_idle_timeout_seconds(raw)
|
||||
|
||||
def _scale_to_zero_should_arm(self) -> bool:
|
||||
"""Whether to start the idle watcher (D1/D11/§3.4(1))."""
|
||||
from gateway.relay import relay_wake_url
|
||||
from gateway.scale_to_zero import (
|
||||
messaging_is_relay_only_or_absent,
|
||||
scale_to_zero_enabled,
|
||||
should_arm,
|
||||
)
|
||||
|
||||
try:
|
||||
platforms = list(self.config.platforms.keys()) if self.config else []
|
||||
except Exception: # noqa: BLE001
|
||||
platforms = []
|
||||
try:
|
||||
wake_url = relay_wake_url()
|
||||
except Exception: # noqa: BLE001
|
||||
wake_url = None
|
||||
return should_arm(
|
||||
enabled=scale_to_zero_enabled(),
|
||||
relay_only_or_absent=messaging_is_relay_only_or_absent(platforms),
|
||||
wake_url=wake_url,
|
||||
)
|
||||
|
||||
def _scale_to_zero_is_idle(self) -> bool:
|
||||
from gateway.scale_to_zero import is_idle
|
||||
|
||||
return is_idle(
|
||||
running_agent_count=self._running_agent_count(),
|
||||
seconds_since_last_inbound=time.time() - self._last_inbound_at,
|
||||
idle_timeout_seconds=self._scale_to_zero_idle_timeout_seconds(),
|
||||
has_live_background_work=self._scale_to_zero_has_live_background_work(),
|
||||
)
|
||||
|
||||
def _relay_adapter_for_dormancy(self):
|
||||
"""Return the connected RELAY adapter, if any (the one go_dormant targets)."""
|
||||
try:
|
||||
from gateway.platforms.base import Platform
|
||||
except Exception: # noqa: BLE001
|
||||
return None
|
||||
return self.adapters.get(Platform.RELAY)
|
||||
|
||||
async def _scale_to_zero_watcher(self, interval: float = 30.0) -> None:
|
||||
"""Watch for idle and drive the relay dormant so the platform can suspend.
|
||||
|
||||
Started ONLY when _scale_to_zero_should_arm() (opted in via the Labs
|
||||
HERMES_SCALE_TO_ZERO stamp + relay-only/absent messaging + a wakeUrl).
|
||||
On a sustained idle window it runs the DORMANT sequence (D12/F12/F14):
|
||||
- mark runtime status `draining` (composes with the existing state
|
||||
machine, §3.4(6); does NOT set _running=False),
|
||||
- relay adapter.go_dormant() — going_idle->ack + supervisor-preserving
|
||||
socket close (NOT disconnect(), NOT the run.py stop path),
|
||||
- deliberately NO mark_resume_pending (D13 — suspend preserves RAM).
|
||||
The process stays alive; the platform (Fly autostop:"suspend") suspends
|
||||
the now-traffic-idle machine and autostart wakes it on the wakeUrl poke,
|
||||
at which point the preserved reconnect supervisor re-dials and the
|
||||
connector drains the buffered backlog. After driving dormant we set a
|
||||
re-arm cooldown so a wake's drained backlog isn't immediately re-quiesced.
|
||||
"""
|
||||
await asyncio.sleep(min(interval, 30.0)) # let startup settle
|
||||
while self._running:
|
||||
try:
|
||||
await asyncio.sleep(interval)
|
||||
if not self._running:
|
||||
return
|
||||
if time.time() < self._scale_to_zero_cooldown_until:
|
||||
continue
|
||||
if not self._scale_to_zero_is_idle():
|
||||
continue
|
||||
adapter = self._relay_adapter_for_dormancy()
|
||||
if adapter is None:
|
||||
continue
|
||||
go_dormant = getattr(adapter, "go_dormant", None)
|
||||
if not callable(go_dormant):
|
||||
continue
|
||||
logger.info(
|
||||
"scale-to-zero: gateway idle for >= %.0fs — going dormant "
|
||||
"(relay buffered, socket closed, awaiting platform suspend)",
|
||||
self._scale_to_zero_idle_timeout_seconds(),
|
||||
)
|
||||
try:
|
||||
self._update_runtime_status("draining")
|
||||
except Exception: # noqa: BLE001 - status is best-effort
|
||||
logger.debug("scale-to-zero: status mark failed", exc_info=True)
|
||||
try:
|
||||
result = go_dormant()
|
||||
if asyncio.iscoroutine(result):
|
||||
await result
|
||||
except Exception: # noqa: BLE001 - dormancy is best-effort
|
||||
logger.debug("scale-to-zero: go_dormant failed", exc_info=True)
|
||||
# 0.F: after a wake the drained inbound updates _last_inbound_at,
|
||||
# but give it a window so we don't immediately re-go-dormant on the
|
||||
# same idle reading before traffic lands.
|
||||
self._scale_to_zero_cooldown_until = time.time() + max(interval, 60.0)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception: # noqa: BLE001 - the watcher must never crash the gateway
|
||||
logger.debug("scale-to-zero watcher iteration error", exc_info=True)
|
||||
|
||||
def _status_action_label(self) -> str:
|
||||
return "restart" if self._restart_requested else "shutdown"
|
||||
|
||||
|
|
@ -5965,6 +6115,22 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
# idle case where the subagent finishes with no agent turn running.
|
||||
asyncio.create_task(self._async_delegation_watcher())
|
||||
|
||||
# Start the scale-to-zero idle watcher ONLY when this instance is opted
|
||||
# in (the NAS "Labs" HERMES_SCALE_TO_ZERO stamp), messaging is
|
||||
# relay-only/absent, and a wakeUrl is registered (decisions.md D1/D11/
|
||||
# §3.4(1)). A non-opted instance never starts it, so behaviour is exactly
|
||||
# as today. When armed, the watcher drives the relay dormant on sustained
|
||||
# idle so the platform (Fly autostop:"suspend") can suspend the machine.
|
||||
try:
|
||||
if self._scale_to_zero_should_arm():
|
||||
logger.info(
|
||||
"scale-to-zero: armed (idle timeout %.0fs) — watching for idle",
|
||||
self._scale_to_zero_idle_timeout_seconds(),
|
||||
)
|
||||
asyncio.create_task(self._scale_to_zero_watcher())
|
||||
except Exception: # noqa: BLE001 - arming must never block startup
|
||||
logger.debug("scale-to-zero: arm check failed at startup", exc_info=True)
|
||||
|
||||
logger.info("Press Ctrl+C to stop")
|
||||
|
||||
return True
|
||||
|
|
@ -7332,6 +7498,14 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
# are system-generated and must skip user authorization.
|
||||
is_internal = bool(getattr(event, "internal", False))
|
||||
|
||||
# scale-to-zero (Phase 0, 0.B/F13): stamp the gateway-scoped last-inbound
|
||||
# clock for real (user-originated) inbound only. Internal/system events
|
||||
# (background-process completions, startup-restore replays) are NOT
|
||||
# traffic — counting them would keep a genuinely idle gateway awake. This
|
||||
# clock is what the idle predicate (gateway/scale_to_zero.is_idle) reads.
|
||||
if not is_internal:
|
||||
self._last_inbound_at = time.time()
|
||||
|
||||
# Fire pre_gateway_dispatch plugin hook for user-originated messages.
|
||||
# Plugins receive the MessageEvent and may return a dict influencing flow:
|
||||
# {"action": "skip", "reason": ...} -> drop (no reply, plugin handled)
|
||||
|
|
|
|||
124
gateway/scale_to_zero.py
Normal file
124
gateway/scale_to_zero.py
Normal file
|
|
@ -0,0 +1,124 @@
|
|||
"""Scale-to-zero idle detection + dormant-quiesce for the gateway (Phase 0).
|
||||
|
||||
This is the gateway-side BEHAVIOUR layer that consumes the relay scale-to-zero
|
||||
PRIMITIVES (gateway-gateway Phase 5: the buffered-flip, the durable per-instance
|
||||
buffer, the wakeUrl poke, the reconnect supervisor). It owns the *decision* to go
|
||||
idle and drives the relay transport's ``go_dormant()`` (D12) — it does NOT itself
|
||||
suspend the machine. On Fly, the now-traffic-idle machine is suspended by
|
||||
``autostop:"suspend"`` and woken by autostart-on-wakeUrl (decisions.md Q3=C′).
|
||||
|
||||
Design constraints (decisions.md):
|
||||
- Per-instance enable is gated SOLELY by the NAS "Labs" toggle, carried to the
|
||||
gateway as the ``HERMES_SCALE_TO_ZERO`` env stamp (D11/Q8=A). NOT a user
|
||||
config key; ``scale_to_zero.idle_timeout_minutes`` IS config.yaml (D2).
|
||||
- Arm only when messaging is relay-only or absent (D1/F6) AND a wakeUrl is
|
||||
registered (§3.4(1)) AND the flag is set.
|
||||
- Idle = no in-flight agent turn AND no inbound for N min AND no live
|
||||
background work (D2/D3/F7).
|
||||
- The quiesce uses ``go_dormant()`` (socket closed + supervisor preserved),
|
||||
NEVER the stop/restart drain or ``disconnect()`` (F12/F14). The process stays
|
||||
alive; Fly freezes+resumes it.
|
||||
- ``mark_resume_pending`` is deliberately NOT called here (D13 — suspend
|
||||
preserves RAM; revive only if we move to autostop:"stop" or see kills).
|
||||
|
||||
The pure helpers (``parse_idle_timeout_seconds``, ``scale_to_zero_enabled``,
|
||||
``messaging_is_relay_only_or_absent``, ``is_idle``, ``should_arm``) take plain
|
||||
inputs so they unit-test without a live gateway.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from typing import Any, Iterable, Optional
|
||||
|
||||
# Env flag stamped by NAS when the scaleToZero Labs toggle is on (D11/Q8=A),
|
||||
# mirroring how the `relay` feature stamps GATEWAY_RELAY_URL. Truthy values only.
|
||||
SCALE_TO_ZERO_ENV = "HERMES_SCALE_TO_ZERO"
|
||||
|
||||
# config.yaml default (D2). Behavioural setting -> config, not env.
|
||||
DEFAULT_IDLE_TIMEOUT_MINUTES = 5
|
||||
|
||||
_TRUTHY = {"1", "true", "yes", "on"}
|
||||
|
||||
|
||||
def scale_to_zero_enabled(environ: Optional[dict] = None) -> bool:
|
||||
"""Whether the per-instance Labs toggle is on (the HERMES_SCALE_TO_ZERO stamp).
|
||||
|
||||
D11/Q8=A: this env flag is the SOLE per-instance enable signal reaching the
|
||||
gateway. Absent/blank/falsey -> disabled (fail-safe default off).
|
||||
"""
|
||||
env = environ if environ is not None else os.environ
|
||||
return str(env.get(SCALE_TO_ZERO_ENV, "")).strip().lower() in _TRUTHY
|
||||
|
||||
|
||||
def parse_idle_timeout_seconds(
|
||||
cfg_value: Any, default_minutes: int = DEFAULT_IDLE_TIMEOUT_MINUTES
|
||||
) -> float:
|
||||
"""Coerce ``scale_to_zero.idle_timeout_minutes`` (config.yaml, D2) to seconds.
|
||||
|
||||
Degrades to the default on any non-numeric / non-positive value (never raises,
|
||||
never returns <= 0 — a zero/negative timeout would make the gateway go dormant
|
||||
instantly, which is never the intent).
|
||||
"""
|
||||
try:
|
||||
minutes = float(cfg_value)
|
||||
except (TypeError, ValueError):
|
||||
minutes = float(default_minutes)
|
||||
if minutes <= 0:
|
||||
minutes = float(default_minutes)
|
||||
return minutes * 60.0
|
||||
|
||||
|
||||
def messaging_is_relay_only_or_absent(platforms: Iterable[Any]) -> bool:
|
||||
"""True iff the only connected messaging platform is RELAY, or there is none
|
||||
(a Chronos-only / no-platform agent) — the F6/D1 structural precondition.
|
||||
|
||||
A directly-connected platform (Discord/Telegram/Slack/...) holds a live
|
||||
socket and cannot scale to zero, so its presence disarms the feature. We
|
||||
compare by the platform's ``.value``/name to avoid importing the enum here
|
||||
(keeps this module import-light and unit-testable).
|
||||
"""
|
||||
names = {_platform_name(p) for p in platforms}
|
||||
names.discard("relay")
|
||||
return len(names) == 0
|
||||
|
||||
|
||||
def _platform_name(platform: Any) -> str:
|
||||
value = getattr(platform, "value", platform)
|
||||
return str(value).strip().lower()
|
||||
|
||||
|
||||
def should_arm(
|
||||
*,
|
||||
enabled: bool,
|
||||
relay_only_or_absent: bool,
|
||||
wake_url: Optional[str],
|
||||
) -> bool:
|
||||
"""Whether to start the idle watcher at all (D1/D11/§3.4(1)).
|
||||
|
||||
ALL must hold: the Labs flag is on, messaging is relay-only/absent, and a
|
||||
wakeUrl is registered (a suspended instance with no reachable wake target is
|
||||
a black hole — §3.4(1)). Any unmet -> the watcher never starts (no idle
|
||||
timer, no dormancy), so a non-opted instance behaves exactly as today.
|
||||
"""
|
||||
return bool(enabled) and bool(relay_only_or_absent) and bool(wake_url)
|
||||
|
||||
|
||||
def is_idle(
|
||||
*,
|
||||
running_agent_count: int,
|
||||
seconds_since_last_inbound: float,
|
||||
idle_timeout_seconds: float,
|
||||
has_live_background_work: bool,
|
||||
) -> bool:
|
||||
"""The idle predicate (D2/D3/F7). Pure — composes the three conjuncts.
|
||||
|
||||
Idle iff: no in-flight agent turn, no inbound within the timeout window, and
|
||||
no live background work (backgrounded delegate_task / kanban / bg terminal).
|
||||
Any active work keeps the gateway awake — suspending mid-flight would lose it.
|
||||
"""
|
||||
if running_agent_count > 0:
|
||||
return False
|
||||
if has_live_background_work:
|
||||
return False
|
||||
return seconds_since_last_inbound >= idle_timeout_seconds
|
||||
|
|
@ -2523,6 +2523,18 @@ DEFAULT_CONFIG = {
|
|||
# Gateway settings — control how messaging platforms (Telegram, Discord,
|
||||
# Slack, etc.) deliver agent-produced files as native attachments.
|
||||
"gateway": {
|
||||
# Scale-to-zero idle detection (Phase 0). The gateway watches for idle
|
||||
# and, when an instance is opted in via the NAS "Labs" toggle (carried as
|
||||
# the HERMES_SCALE_TO_ZERO env stamp) AND messaging is relay-only/absent
|
||||
# AND a wakeUrl is registered, drives the relay transport dormant so the
|
||||
# platform (e.g. Fly autostop:"suspend") can suspend the now-idle machine;
|
||||
# it wakes on the connector's wakeUrl poke. This is the idle TIMEOUT only
|
||||
# — whether the feature is enabled at all is the Labs toggle, never a
|
||||
# config key (decisions.md D2/D11). 0/negative falls back to the default.
|
||||
"scale_to_zero": {
|
||||
"idle_timeout_minutes": 5,
|
||||
},
|
||||
|
||||
# Inject a human-readable timestamp prefix (e.g.
|
||||
# "[Tue 2026-04-28 13:40:53 CEST]") onto user messages IN THE MODEL'S
|
||||
# CONTEXT so the agent has temporal awareness of when each message was
|
||||
|
|
|
|||
144
tests/gateway/test_scale_to_zero.py
Normal file
144
tests/gateway/test_scale_to_zero.py
Normal file
|
|
@ -0,0 +1,144 @@
|
|||
"""Unit tests for the scale-to-zero idle-detection pure logic (Phase 0).
|
||||
|
||||
Behaviour-contract tests (AGENTS.md): each conjunct of the idle predicate and
|
||||
each clause of the arm-gate is exercised independently, not frozen against a
|
||||
snapshot. The pure helpers in gateway/scale_to_zero.py take plain inputs so they
|
||||
test without a live gateway.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.scale_to_zero import (
|
||||
DEFAULT_IDLE_TIMEOUT_MINUTES,
|
||||
SCALE_TO_ZERO_ENV,
|
||||
is_idle,
|
||||
messaging_is_relay_only_or_absent,
|
||||
parse_idle_timeout_seconds,
|
||||
scale_to_zero_enabled,
|
||||
should_arm,
|
||||
)
|
||||
|
||||
|
||||
# ── scale_to_zero_enabled (the Labs HERMES_SCALE_TO_ZERO stamp, D11/Q8=A) ────
|
||||
|
||||
|
||||
@pytest.mark.parametrize("value", ["1", "true", "TRUE", "yes", "on", " On "])
|
||||
def test_enabled_truthy_values(value):
|
||||
assert scale_to_zero_enabled({SCALE_TO_ZERO_ENV: value}) is True
|
||||
|
||||
|
||||
@pytest.mark.parametrize("value", ["", "0", "false", "no", "off", "nope"])
|
||||
def test_enabled_falsey_values(value):
|
||||
assert scale_to_zero_enabled({SCALE_TO_ZERO_ENV: value}) is False
|
||||
|
||||
|
||||
def test_enabled_absent_is_false():
|
||||
# Fail-safe default OFF when the stamp is absent (a non-opted instance).
|
||||
assert scale_to_zero_enabled({}) is False
|
||||
|
||||
|
||||
# ── parse_idle_timeout_seconds (config.yaml, D2) ─────────────────────────────
|
||||
|
||||
|
||||
def test_timeout_parses_minutes_to_seconds():
|
||||
assert parse_idle_timeout_seconds(5) == 300.0
|
||||
assert parse_idle_timeout_seconds(10) == 600.0
|
||||
assert parse_idle_timeout_seconds("5") == 300.0
|
||||
|
||||
|
||||
@pytest.mark.parametrize("bad", [None, "", "abc", {}, [], object()])
|
||||
def test_timeout_degrades_to_default_on_garbage(bad):
|
||||
assert parse_idle_timeout_seconds(bad) == DEFAULT_IDLE_TIMEOUT_MINUTES * 60.0
|
||||
|
||||
|
||||
@pytest.mark.parametrize("nonpos", [0, -1, -30, "0", "-5"])
|
||||
def test_timeout_rejects_nonpositive(nonpos):
|
||||
# A zero/negative timeout would go dormant instantly — never the intent.
|
||||
assert parse_idle_timeout_seconds(nonpos) == DEFAULT_IDLE_TIMEOUT_MINUTES * 60.0
|
||||
|
||||
|
||||
# ── messaging_is_relay_only_or_absent (F6/D1) ────────────────────────────────
|
||||
|
||||
|
||||
class _P:
|
||||
"""Stand-in for a Platform enum member with a ``.value``."""
|
||||
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
|
||||
def test_relay_only_is_true():
|
||||
assert messaging_is_relay_only_or_absent([_P("relay")]) is True
|
||||
|
||||
|
||||
def test_no_platform_is_true():
|
||||
# A Chronos-only / no-messaging-platform agent can scale to zero.
|
||||
assert messaging_is_relay_only_or_absent([]) is True
|
||||
|
||||
|
||||
def test_direct_socket_platform_disarms():
|
||||
assert messaging_is_relay_only_or_absent([_P("discord")]) is False
|
||||
assert messaging_is_relay_only_or_absent([_P("relay"), _P("telegram")]) is False
|
||||
|
||||
|
||||
def test_accepts_bare_strings_too():
|
||||
assert messaging_is_relay_only_or_absent(["relay"]) is True
|
||||
assert messaging_is_relay_only_or_absent(["discord"]) is False
|
||||
|
||||
|
||||
# ── should_arm (D1/D11/§3.4(1)) ──────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_arm_requires_all_three():
|
||||
assert should_arm(enabled=True, relay_only_or_absent=True, wake_url="https://x") is True
|
||||
|
||||
|
||||
def test_arm_blocked_when_flag_off():
|
||||
assert should_arm(enabled=False, relay_only_or_absent=True, wake_url="https://x") is False
|
||||
|
||||
|
||||
def test_arm_blocked_when_direct_socket():
|
||||
assert should_arm(enabled=True, relay_only_or_absent=False, wake_url="https://x") is False
|
||||
|
||||
|
||||
def test_arm_blocked_without_wake_url():
|
||||
# A suspended instance with no wake target is a black hole (§3.4(1)).
|
||||
assert should_arm(enabled=True, relay_only_or_absent=True, wake_url=None) is False
|
||||
assert should_arm(enabled=True, relay_only_or_absent=True, wake_url="") is False
|
||||
|
||||
|
||||
# ── is_idle (D2/D3/F7) — each conjunct flips the result ──────────────────────
|
||||
|
||||
|
||||
def _idle_kwargs(**over):
|
||||
base = dict(
|
||||
running_agent_count=0,
|
||||
seconds_since_last_inbound=600.0,
|
||||
idle_timeout_seconds=300.0,
|
||||
has_live_background_work=False,
|
||||
)
|
||||
base.update(over)
|
||||
return base
|
||||
|
||||
|
||||
def test_idle_true_when_all_quiet():
|
||||
assert is_idle(**_idle_kwargs()) is True
|
||||
|
||||
|
||||
def test_not_idle_with_running_agent():
|
||||
assert is_idle(**_idle_kwargs(running_agent_count=1)) is False
|
||||
|
||||
|
||||
def test_not_idle_within_timeout_window():
|
||||
assert is_idle(**_idle_kwargs(seconds_since_last_inbound=120.0)) is False
|
||||
|
||||
|
||||
def test_idle_exactly_at_threshold():
|
||||
# >= timeout is idle (boundary).
|
||||
assert is_idle(**_idle_kwargs(seconds_since_last_inbound=300.0)) is True
|
||||
|
||||
|
||||
def test_not_idle_with_live_background_work():
|
||||
assert is_idle(**_idle_kwargs(has_live_background_work=True)) is False
|
||||
120
tests/gateway/test_scale_to_zero_watcher.py
Normal file
120
tests/gateway/test_scale_to_zero_watcher.py
Normal file
|
|
@ -0,0 +1,120 @@
|
|||
"""Watcher-level tests for scale-to-zero: the idle watcher's dormant sequence and
|
||||
the arm-gate wiring, exercised against the real GatewayRunner methods bound onto
|
||||
a lightweight stand-in (booting a full gateway is unnecessary for this logic and
|
||||
would be slow/flaky).
|
||||
|
||||
These cover the parts gateway/test_scale_to_zero.py (pure helpers) can't: that
|
||||
the watcher calls the relay adapter's go_dormant() exactly when idle+armed,
|
||||
respects the cooldown, and skips when busy — the F7/D3 + D12 behaviour.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.run import GatewayRunner
|
||||
|
||||
|
||||
class _FakeRelayAdapter:
|
||||
def __init__(self):
|
||||
self.go_dormant_calls = 0
|
||||
|
||||
async def go_dormant(self):
|
||||
self.go_dormant_calls += 1
|
||||
return True
|
||||
|
||||
|
||||
def _runner_with(monkeypatch, *, idle, armed_adapter=True):
|
||||
"""Build a GatewayRunner without booting it, stubbing just what the watcher
|
||||
touches. Real methods (_scale_to_zero_is_idle composition, the watcher body)
|
||||
run; only their dependencies are stubbed."""
|
||||
r = GatewayRunner.__new__(GatewayRunner)
|
||||
r._running = True
|
||||
r._scale_to_zero_cooldown_until = 0.0
|
||||
r._last_inbound_at = time.time()
|
||||
r._running_agents = {}
|
||||
r._background_tasks = set()
|
||||
adapter = _FakeRelayAdapter() if armed_adapter else None
|
||||
|
||||
monkeypatch.setattr(r, "_scale_to_zero_is_idle", lambda: idle, raising=False)
|
||||
monkeypatch.setattr(r, "_relay_adapter_for_dormancy", lambda: adapter, raising=False)
|
||||
monkeypatch.setattr(r, "_scale_to_zero_idle_timeout_seconds", lambda: 300.0, raising=False)
|
||||
monkeypatch.setattr(r, "_update_runtime_status", lambda *a, **k: None, raising=False)
|
||||
return r, adapter
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_watcher_goes_dormant_when_idle(monkeypatch):
|
||||
r, adapter = _runner_with(monkeypatch, idle=True)
|
||||
# Run one iteration: stop after the first sleep so the loop exits cleanly.
|
||||
task = asyncio.create_task(r._scale_to_zero_watcher(interval=0.01))
|
||||
await asyncio.sleep(0.1)
|
||||
r._running = False
|
||||
await asyncio.wait_for(task, timeout=2)
|
||||
assert adapter.go_dormant_calls >= 1
|
||||
# After driving dormant, a re-arm cooldown is set (0.F).
|
||||
assert r._scale_to_zero_cooldown_until > time.time()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_watcher_does_not_go_dormant_when_busy(monkeypatch):
|
||||
r, adapter = _runner_with(monkeypatch, idle=False)
|
||||
task = asyncio.create_task(r._scale_to_zero_watcher(interval=0.01))
|
||||
await asyncio.sleep(0.1)
|
||||
r._running = False
|
||||
await asyncio.wait_for(task, timeout=2)
|
||||
assert adapter.go_dormant_calls == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_watcher_respects_cooldown(monkeypatch):
|
||||
r, adapter = _runner_with(monkeypatch, idle=True)
|
||||
# Cooldown active far in the future: even though idle, no dormancy fires.
|
||||
r._scale_to_zero_cooldown_until = time.time() + 3600
|
||||
task = asyncio.create_task(r._scale_to_zero_watcher(interval=0.01))
|
||||
await asyncio.sleep(0.1)
|
||||
r._running = False
|
||||
await asyncio.wait_for(task, timeout=2)
|
||||
assert adapter.go_dormant_calls == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_watcher_noop_when_no_relay_adapter(monkeypatch):
|
||||
# Armed-but-no-relay-adapter (e.g. relay not yet connected): must not crash.
|
||||
r, _ = _runner_with(monkeypatch, idle=True, armed_adapter=False)
|
||||
task = asyncio.create_task(r._scale_to_zero_watcher(interval=0.01))
|
||||
await asyncio.sleep(0.1)
|
||||
r._running = False
|
||||
await asyncio.wait_for(task, timeout=2)
|
||||
# No exception, loop exits cleanly — nothing to assert beyond survival.
|
||||
|
||||
|
||||
def test_bg_work_blocks_idle_via_background_tasks(monkeypatch):
|
||||
"""_scale_to_zero_has_live_background_work() reports True when a tracked
|
||||
background task is still live (D3/F7) — the guard that keeps a gateway with
|
||||
an in-flight backgrounded subagent/terminal awake."""
|
||||
r = GatewayRunner.__new__(GatewayRunner)
|
||||
|
||||
async def _never():
|
||||
await asyncio.sleep(3600)
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
try:
|
||||
t = loop.create_task(_never())
|
||||
r._background_tasks = {t}
|
||||
# process_registry has nothing active in this fresh process.
|
||||
assert r._scale_to_zero_has_live_background_work() is True
|
||||
t.cancel()
|
||||
finally:
|
||||
loop.run_until_complete(asyncio.gather(t, return_exceptions=True))
|
||||
loop.close()
|
||||
|
||||
|
||||
def test_bg_work_false_when_quiet():
|
||||
r = GatewayRunner.__new__(GatewayRunner)
|
||||
r._background_tasks = set()
|
||||
# No background tasks, no active processes in this fresh process.
|
||||
assert r._scale_to_zero_has_live_background_work() is False
|
||||
|
|
@ -1581,6 +1581,23 @@ class ProcessRegistry:
|
|||
for s in self._running.values()
|
||||
)
|
||||
|
||||
def has_any_active(self) -> bool:
|
||||
"""Whether ANY background process is still running (across all sessions).
|
||||
|
||||
Used by scale-to-zero idle detection (gateway/scale_to_zero): a gateway
|
||||
with a live background process (terminal background=true) is NOT idle and
|
||||
must not be suspended, or the process is lost. Refreshes detached
|
||||
sessions first so a finished-but-unreaped process reads as inactive.
|
||||
"""
|
||||
with self._lock:
|
||||
sessions = list(self._running.values())
|
||||
|
||||
for session in sessions:
|
||||
self._refresh_detached_session(session)
|
||||
|
||||
with self._lock:
|
||||
return any(not s.exited for s in self._running.values())
|
||||
|
||||
def kill_all(self, task_id: str = None) -> int:
|
||||
"""Kill all running processes, optionally filtered by task_id. Returns count killed."""
|
||||
with self._lock:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue