mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-15 09:21:36 +00:00
fix(s6): persist profile gateway desired state (#46292)
* fix: persist s6 gateway desired state * chore(release): map salvaged contributor --------- Co-authored-by: Alfred Smith <alfred@my-cloud.me> Co-authored-by: Ben <ben@nousresearch.com>
This commit is contained in:
parent
61ee2dbfdb
commit
b770967263
5 changed files with 175 additions and 13 deletions
|
|
@ -28,11 +28,14 @@ from typing import Literal, Sequence
|
|||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# Only this prior state triggers automatic restart. Everything else
|
||||
# Only this desired state triggers automatic restart. Everything else
|
||||
# (startup_failed, starting, stopped, missing) registers the slot in
|
||||
# the down state and waits for explicit user action — this avoids the
|
||||
# crash-loop where a broken gateway keeps being restarted across
|
||||
# `docker restart` cycles.
|
||||
# `docker restart` cycles. Older installs only have gateway_state;
|
||||
# newer lifecycle commands persist desired_state separately so a transient
|
||||
# runtime state (draining/startup_failed) does not erase the operator's
|
||||
# durable start/stop intent across pod/container recreation.
|
||||
_AUTOSTART_STATES = frozenset({"running"})
|
||||
|
||||
# Stale runtime files we sweep before recreating service slots. These
|
||||
|
|
@ -104,7 +107,7 @@ def reconcile_profile_gateways(
|
|||
container_argv=container_argv,
|
||||
dry_run=dry_run,
|
||||
)
|
||||
default_prior_state = legacy_default_state or _read_prior_state(hermes_home)
|
||||
default_prior_state = legacy_default_state or _read_desired_state(hermes_home)
|
||||
default_should_start = default_prior_state in _AUTOSTART_STATES
|
||||
if not dry_run:
|
||||
_cleanup_stale_runtime_files(hermes_home)
|
||||
|
|
@ -139,7 +142,7 @@ def reconcile_profile_gateways(
|
|||
)
|
||||
continue
|
||||
|
||||
prior_state = _read_prior_state(entry)
|
||||
prior_state = _read_desired_state(entry)
|
||||
should_start = prior_state in _AUTOSTART_STATES
|
||||
|
||||
if not dry_run:
|
||||
|
|
@ -188,6 +191,7 @@ def _maybe_migrate_legacy_gateway_run_state(
|
|||
import time
|
||||
state_file.write_text(json.dumps({
|
||||
"gateway_state": "running",
|
||||
"desired_state": "running",
|
||||
"timestamp": int(time.time()),
|
||||
"migrated_from": "legacy-container-cmd",
|
||||
}) + "\n")
|
||||
|
|
@ -217,15 +221,26 @@ def _is_legacy_gateway_run_request(argv: Sequence[str]) -> bool:
|
|||
return len(args) >= 2 and args[0] == "gateway" and args[1] == "run"
|
||||
|
||||
|
||||
def _read_prior_state(profile_dir: Path) -> str | None:
|
||||
"""Read gateway_state.json's ``gateway_state`` field, or None if
|
||||
missing or unparseable. Unparseable counts as "no prior state" so
|
||||
we don't bork the whole reconciliation on a corrupt file."""
|
||||
def _read_desired_state(profile_dir: Path) -> str | None:
|
||||
"""Read the persisted gateway desired state for reconciliation.
|
||||
|
||||
Newer state files carry ``desired_state``: operator intent written by
|
||||
s6 lifecycle commands. Older files only carry ``gateway_state``; keep
|
||||
that as a compatibility fallback so existing running/stopped profiles
|
||||
preserve their behavior until the next explicit start/stop.
|
||||
|
||||
Missing or unparseable files count as "no desired state" so we don't
|
||||
bork the whole reconciliation on a corrupt file.
|
||||
"""
|
||||
state_file = profile_dir / "gateway_state.json"
|
||||
if not state_file.exists():
|
||||
return None
|
||||
try:
|
||||
return json.loads(state_file.read_text()).get("gateway_state")
|
||||
data = json.loads(state_file.read_text())
|
||||
desired_state = data.get("desired_state")
|
||||
if desired_state is not None:
|
||||
return desired_state
|
||||
return data.get("gateway_state")
|
||||
except (OSError, json.JSONDecodeError):
|
||||
log.warning(
|
||||
"could not read %s; treating as no prior state", state_file,
|
||||
|
|
|
|||
|
|
@ -334,6 +334,62 @@ def get_service_manager() -> ServiceManager:
|
|||
S6_DYNAMIC_SCANDIR = Path("/run/service")
|
||||
S6_SERVICE_PREFIX = "gateway-"
|
||||
|
||||
|
||||
def _profile_dir_for_gateway_service(name: str) -> Path:
|
||||
"""Resolve ``gateway-<profile>`` to its persistent profile directory.
|
||||
|
||||
s6 lifecycle commands may be invoked from any active profile, including
|
||||
``gateway stop --all``. Do not write the caller's HERMES_HOME blindly;
|
||||
derive the shared profile root from the current HERMES_HOME and map the
|
||||
service suffix to either the root default profile or
|
||||
``<root>/profiles/<profile>``.
|
||||
"""
|
||||
import os
|
||||
|
||||
profile = name[len(S6_SERVICE_PREFIX):] if name.startswith(S6_SERVICE_PREFIX) else name
|
||||
validate_profile_name(profile)
|
||||
hermes_home = Path(os.environ.get("HERMES_HOME", "/opt/data"))
|
||||
if hermes_home.parent.name == "profiles":
|
||||
root = hermes_home.parent.parent
|
||||
else:
|
||||
root = hermes_home
|
||||
return root if profile == "default" else root / "profiles" / profile
|
||||
|
||||
|
||||
def _write_gateway_desired_state(name: str, desired_state: str) -> None:
|
||||
"""Persist durable s6 gateway intent next to runtime status.
|
||||
|
||||
``gateway_state`` remains the volatile runtime field written by the
|
||||
gateway process. ``desired_state`` records the operator's start/stop
|
||||
intent so container-boot reconciliation can restore the correct s6
|
||||
want-up/want-down state after pod recreation even if the previous runtime
|
||||
state was transient (draining, startup_failed, etc.). The write is
|
||||
best-effort: a failed persistence attempt must not prevent immediate s6
|
||||
lifecycle control.
|
||||
"""
|
||||
import json
|
||||
import time
|
||||
|
||||
profile_dir = _profile_dir_for_gateway_service(name)
|
||||
state_file = profile_dir / "gateway_state.json"
|
||||
try:
|
||||
if not profile_dir.exists():
|
||||
return
|
||||
try:
|
||||
data = json.loads(state_file.read_text()) if state_file.exists() else {}
|
||||
if not isinstance(data, dict):
|
||||
data = {}
|
||||
except (OSError, json.JSONDecodeError):
|
||||
data = {}
|
||||
data["desired_state"] = desired_state
|
||||
data["updated_at"] = int(time.time())
|
||||
tmp = state_file.with_suffix(state_file.suffix + ".tmp")
|
||||
tmp.write_text(json.dumps(data, separators=(",", ":")) + "\n")
|
||||
tmp.replace(state_file)
|
||||
except OSError:
|
||||
return
|
||||
|
||||
|
||||
# s6-overlay installs its binaries under /command/ and only adds that
|
||||
# directory to PATH for processes started under the supervision tree
|
||||
# (services started by s6-svscan, cont-init.d scripts, etc.). Code
|
||||
|
|
@ -761,6 +817,7 @@ class S6ServiceManager:
|
|||
(permission denied on the supervise FIFO, timeout, etc.).
|
||||
"""
|
||||
self._run_svc("-u", "start", name)
|
||||
_write_gateway_desired_state(name, "running")
|
||||
|
||||
def _supervised_pid(self, name: str) -> int | None:
|
||||
"""Return the PID of the supervised gateway process, or None.
|
||||
|
|
@ -812,6 +869,7 @@ class S6ServiceManager:
|
|||
except Exception:
|
||||
pass
|
||||
self._run_svc("-d", "stop", name)
|
||||
_write_gateway_desired_state(name, "stopped")
|
||||
|
||||
def restart(self, name: str) -> None:
|
||||
"""Restart a registered service (``s6-svc -t`` = SIGTERM).
|
||||
|
|
@ -821,6 +879,7 @@ class S6ServiceManager:
|
|||
S6CommandError: s6-svc exited non-zero for any other reason.
|
||||
"""
|
||||
self._run_svc("-t", "restart", name)
|
||||
_write_gateway_desired_state(name, "running")
|
||||
|
||||
def is_running(self, name: str) -> bool:
|
||||
"""True iff ``s6-svstat`` reports the service as up."""
|
||||
|
|
|
|||
|
|
@ -84,6 +84,7 @@ AUTHOR_MAP = {
|
|||
"290859878+synapsesx@users.noreply.github.com": "synapsesx",
|
||||
"157689911+itsflownium@users.noreply.github.com": "itsflownium",
|
||||
"dirtyren@users.noreply.github.com": "dirtyren",
|
||||
"alfred@my-cloud.me": "alfred-smith-0",
|
||||
"tangtaizhong792@gmail.com": "tangtaizong666",
|
||||
"github@aldo.pw": "aldoeliacim",
|
||||
"max@c60spaceship.com": "MaxFreedomPollard",
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ def _make_profile(
|
|||
name: str,
|
||||
*,
|
||||
state: str | None,
|
||||
desired_state: str | None = None,
|
||||
with_pid: bool = False,
|
||||
config: bool = True,
|
||||
) -> Path:
|
||||
|
|
@ -40,10 +41,13 @@ def _make_profile(
|
|||
# SOUL.md is what the reconciler keys on — it's always seeded by
|
||||
# `hermes profile create`. See container_boot._render_run_script.
|
||||
(p / "SOUL.md").write_text("# fake profile\n")
|
||||
if state is not None:
|
||||
(p / "gateway_state.json").write_text(json.dumps({
|
||||
"gateway_state": state, "timestamp": 1234567890,
|
||||
}))
|
||||
if state is not None or desired_state is not None:
|
||||
payload: dict[str, object] = {"timestamp": 1234567890}
|
||||
if state is not None:
|
||||
payload["gateway_state"] = state
|
||||
if desired_state is not None:
|
||||
payload["desired_state"] = desired_state
|
||||
(p / "gateway_state.json").write_text(json.dumps(payload))
|
||||
if with_pid:
|
||||
(p / "gateway.pid").write_text(json.dumps(
|
||||
{"pid": 99999, "host": "old-container"},
|
||||
|
|
@ -130,6 +134,46 @@ def test_startup_failed_does_not_autostart(tmp_path: Path) -> None:
|
|||
assert (scandir / "gateway-broken" / "down").exists()
|
||||
|
||||
|
||||
def test_desired_state_running_autostarts_even_if_runtime_failed(tmp_path: Path) -> None:
|
||||
"""Persisted operator intent wins over transient runtime failures."""
|
||||
scandir = tmp_path / "run-service"; scandir.mkdir()
|
||||
_make_profile(
|
||||
tmp_path,
|
||||
"resilient",
|
||||
state="startup_failed",
|
||||
desired_state="running",
|
||||
)
|
||||
|
||||
actions = reconcile_profile_gateways(
|
||||
hermes_home=tmp_path, scandir=scandir, dry_run=False,
|
||||
)
|
||||
|
||||
assert _named_actions(actions) == [ReconcileAction(
|
||||
profile="resilient", prior_state="running", action="started",
|
||||
)]
|
||||
assert not (scandir / "gateway-resilient" / "down").exists()
|
||||
|
||||
|
||||
def test_desired_state_stopped_blocks_legacy_running_runtime(tmp_path: Path) -> None:
|
||||
"""Explicit stop must survive a stale legacy runtime state of running."""
|
||||
scandir = tmp_path / "run-service"; scandir.mkdir()
|
||||
_make_profile(
|
||||
tmp_path,
|
||||
"quiet",
|
||||
state="running",
|
||||
desired_state="stopped",
|
||||
)
|
||||
|
||||
actions = reconcile_profile_gateways(
|
||||
hermes_home=tmp_path, scandir=scandir, dry_run=False,
|
||||
)
|
||||
|
||||
assert _named_actions(actions) == [ReconcileAction(
|
||||
profile="quiet", prior_state="stopped", action="registered",
|
||||
)]
|
||||
assert (scandir / "gateway-quiet" / "down").exists()
|
||||
|
||||
|
||||
def test_starting_state_does_not_autostart(tmp_path: Path) -> None:
|
||||
"""`starting` means the gateway died mid-boot last time; treat as
|
||||
failed, not as a candidate for auto-restart."""
|
||||
|
|
@ -513,6 +557,7 @@ def test_legacy_gateway_run_cmd_seeds_default_running_state(
|
|||
assert not (scandir / "gateway-default" / "down").exists()
|
||||
state = json.loads((tmp_path / "gateway_state.json").read_text())
|
||||
assert state["gateway_state"] == "running"
|
||||
assert state["desired_state"] == "running"
|
||||
assert state["migrated_from"] == "legacy-container-cmd"
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -726,6 +726,48 @@ def test_s6_lifecycle_dispatches_to_s6_svc(
|
|||
assert flags == ["-u", "-d", "-t"]
|
||||
|
||||
|
||||
def test_s6_lifecycle_persists_named_profile_desired_state(
|
||||
s6_scandir,
|
||||
fake_subprocess_run,
|
||||
tmp_path,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
import json
|
||||
|
||||
hermes_home = tmp_path / "hermes-home"
|
||||
profile_dir = hermes_home / "profiles" / "coder"
|
||||
profile_dir.mkdir(parents=True)
|
||||
(s6_scandir / "gateway-coder").mkdir()
|
||||
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
||||
|
||||
mgr = S6ServiceManager(scandir=s6_scandir)
|
||||
mgr.start("gateway-coder")
|
||||
assert json.loads((profile_dir / "gateway_state.json").read_text())["desired_state"] == "running"
|
||||
mgr.stop("gateway-coder")
|
||||
assert json.loads((profile_dir / "gateway_state.json").read_text())["desired_state"] == "stopped"
|
||||
mgr.restart("gateway-coder")
|
||||
assert json.loads((profile_dir / "gateway_state.json").read_text())["desired_state"] == "running"
|
||||
|
||||
|
||||
def test_s6_lifecycle_persists_default_profile_desired_state(
|
||||
s6_scandir,
|
||||
fake_subprocess_run,
|
||||
tmp_path,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
import json
|
||||
|
||||
hermes_home = tmp_path / "hermes-home"
|
||||
hermes_home.mkdir()
|
||||
(s6_scandir / "gateway-default").mkdir()
|
||||
monkeypatch.setenv("HERMES_HOME", str(hermes_home / "profiles" / "coder"))
|
||||
|
||||
mgr = S6ServiceManager(scandir=s6_scandir)
|
||||
mgr.start("gateway-default")
|
||||
state = json.loads((hermes_home / "gateway_state.json").read_text())
|
||||
assert state["desired_state"] == "running"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Lifecycle errors — friendly messages, not raw CalledProcessError
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue