mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-09 08:21:50 +00:00
feat(service_manager): add S6ServiceManager for runtime gateway supervision
Phase 3 of the s6-overlay supervision plan. Implements the runtime-
registration surface from D4 — only the s6 backend supports
register_profile_gateway / unregister_profile_gateway /
list_profile_gateways; host backends continue to raise
NotImplementedError. No caller yet (Phase 4 wires in the profile
create/delete hooks).
Key implementation notes:
- Service directory shape: /run/service/gateway-<profile>/{type,run,log/run}.
Atomic register: write to gateway-<profile>.tmp, fsync via
os.rename. Cleanup on rescan failure.
- Run script uses #!/command/with-contenv sh so HERMES_HOME and any
extra_env arrive at exec time. The hermes -p <profile> gateway
start --foreground --port <port> command is wrapped in
s6-setuidgid hermes for the per-service privilege drop (OQ2-A).
- Log script (OQ8-C): persists via s6-log to
${HERMES_HOME}/logs/gateways/<profile>/. CRITICAL — HERMES_HOME is
a runtime env-var expansion in the rendered script, NOT a Python
f-string substitution. Negative-asserted in
test_s6_register_creates_service_dir_and_triggers_scan so
regressions are caught.
- PATH gotcha: /command/ is only on PATH for processes spawned by
the supervision tree (services, cont-init.d). `docker exec` and
profile-create hooks don't get it. S6ServiceManager calls all
s6-* binaries via absolute path through the new _S6_BIN_DIR
constant so callers don't have to fix up env vars.
- validate_profile_name rejects path-traversal, leading-dash (s6
would parse as a flag), uppercase, whitespace, and names >251
chars (s6-svscan default name_max).
Test coverage:
- 13 new unit tests in tests/hermes_cli/test_service_manager.py
(kind detection, run-script content, env quoting, register
rollback on rescan failure, unregister idempotence, list filter,
lifecycle dispatch, svstat parsing). Total: 36 passing.
- 2 new in-container integration tests in
tests/docker/test_s6_profile_gateway_integration.py validating
end-to-end registration against a real s6 supervision tree.
Docker harness: 14 passed, 2 xfailed (Phase 4 target unchanged).
Refs: docs/plans/2026-05-07-s6-overlay-dynamic-subagent-gateways.md
This commit is contained in:
parent
4826ea7b41
commit
ad5fdab092
3 changed files with 622 additions and 11 deletions
|
|
@ -279,9 +279,7 @@ def get_service_manager() -> ServiceManager:
|
|||
"""Return the ServiceManager instance for the current environment.
|
||||
|
||||
Raises:
|
||||
RuntimeError: when no supported backend is available, or when
|
||||
the detected backend's implementation hasn't shipped yet
|
||||
(the s6 backend lands in Phase 3).
|
||||
RuntimeError: when no supported backend is available.
|
||||
"""
|
||||
kind = detect_service_manager()
|
||||
if kind == "systemd":
|
||||
|
|
@ -291,6 +289,283 @@ def get_service_manager() -> ServiceManager:
|
|||
if kind == "windows":
|
||||
return WindowsServiceManager()
|
||||
if kind == "s6":
|
||||
# Phase 3 will replace this with `return S6ServiceManager()`.
|
||||
raise RuntimeError("s6 backend not yet implemented (Phase 3)")
|
||||
return S6ServiceManager()
|
||||
raise RuntimeError("no supported service manager detected")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# S6ServiceManager (container-only)
|
||||
#
|
||||
# Per-profile gateways are registered dynamically when `hermes profile create`
|
||||
# runs inside the container (Phase 4). Static services (main-hermes, dashboard)
|
||||
# live in /etc/s6-overlay/s6-rc.d/ and are NOT managed by this class — they're
|
||||
# part of the image, not runtime-created.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
# s6-overlay's dynamic scandir for runtime-registered services. Lives on
|
||||
# tmpfs and is the directory s6-svscan watches. Writes here trigger
|
||||
# automatic supervision on the next rescan.
|
||||
S6_DYNAMIC_SCANDIR = Path("/run/service")
|
||||
S6_SERVICE_PREFIX = "gateway-"
|
||||
|
||||
# s6-overlay installs its binaries under /command/ and only adds that
|
||||
# directory to PATH for processes started under the supervision tree
|
||||
# (services started by s6-svscan, cont-init.d scripts, etc.). Code
|
||||
# that runs via `docker exec` or any other out-of-tree entry point —
|
||||
# notably our Phase 4 profile create/delete hooks — inherits the
|
||||
# container's base PATH which does NOT include /command/.
|
||||
#
|
||||
# Rather than asking every caller to fix up its environment, the
|
||||
# S6ServiceManager calls s6-* binaries by absolute path via this
|
||||
# constant. We don't use `/usr/bin/s6-…` symlinks because the
|
||||
# s6-overlay-symlinks-noarch tarball only links a subset, and we
|
||||
# want every s6 invocation to be guaranteed-findable.
|
||||
_S6_BIN_DIR = "/command"
|
||||
|
||||
|
||||
class S6ServiceManager:
|
||||
"""Per-profile gateway supervision via s6-overlay.
|
||||
|
||||
Only handles runtime-registered services under
|
||||
``S6_DYNAMIC_SCANDIR``. Static services (main-hermes, dashboard)
|
||||
are managed by s6-rc at image-build time and are out of scope.
|
||||
"""
|
||||
|
||||
kind: ServiceManagerKind = "s6"
|
||||
|
||||
def __init__(self, scandir: Path = S6_DYNAMIC_SCANDIR) -> None:
|
||||
self.scandir = scandir
|
||||
|
||||
# -- internal helpers --------------------------------------------------
|
||||
|
||||
def _service_dir(self, profile: str) -> Path:
|
||||
validate_profile_name(profile)
|
||||
return self.scandir / f"{S6_SERVICE_PREFIX}{profile}"
|
||||
|
||||
def _service_name(self, profile: str) -> str:
|
||||
return f"{S6_SERVICE_PREFIX}{profile}"
|
||||
|
||||
@staticmethod
|
||||
def _render_run_script(
|
||||
profile: str,
|
||||
port: int,
|
||||
extra_env: dict[str, str],
|
||||
) -> str:
|
||||
"""Generate the run script for a profile-gateway s6 service.
|
||||
|
||||
The script:
|
||||
1. Sources HERMES_HOME (and any extra env) via with-contenv —
|
||||
so e.g. ``-e HERMES_HOME=/data/hermes`` is honored at run
|
||||
time, not Python-substituted at registration time (OQ8-C).
|
||||
2. Activates the bundled venv.
|
||||
3. Drops to the hermes user and exec's
|
||||
``hermes -p <profile> gateway start --foreground --port <port>``.
|
||||
"""
|
||||
import shlex
|
||||
lines = [
|
||||
"#!/command/with-contenv sh",
|
||||
"# shellcheck shell=sh",
|
||||
"set -e",
|
||||
"cd /opt/data",
|
||||
". /opt/hermes/.venv/bin/activate",
|
||||
]
|
||||
for k, v in sorted(extra_env.items()):
|
||||
lines.append(f"export {k}={shlex.quote(v)}")
|
||||
lines.append(
|
||||
f"exec s6-setuidgid hermes hermes -p {shlex.quote(profile)} "
|
||||
f"gateway start --foreground --port {port}"
|
||||
)
|
||||
return "\n".join(lines) + "\n"
|
||||
|
||||
@staticmethod
|
||||
def _render_log_run(profile: str) -> str:
|
||||
"""Generate the log/run script for a profile-gateway service.
|
||||
|
||||
OQ8-C: persist to ``${HERMES_HOME}/logs/gateways/<profile>/``.
|
||||
CRITICAL: the HERMES_HOME path is sourced from the runtime env
|
||||
via with-contenv — NOT Python-substituted at registration time
|
||||
— so a container started with ``-e HERMES_HOME=/data/hermes``
|
||||
gets its logs under /data/hermes/logs/..., not the build-time
|
||||
default.
|
||||
"""
|
||||
import shlex
|
||||
prof = shlex.quote(profile)
|
||||
return (
|
||||
f"#!/command/with-contenv sh\n"
|
||||
f"# shellcheck shell=sh\n"
|
||||
f': "${{HERMES_HOME:=/opt/data}}"\n'
|
||||
f'log_dir="$HERMES_HOME/logs/gateways/{prof}"\n'
|
||||
f'mkdir -p "$log_dir"\n'
|
||||
f'chown -R hermes:hermes "$log_dir" 2>/dev/null || true\n'
|
||||
f'exec s6-setuidgid hermes s6-log n10 s1000000 T "$log_dir"\n'
|
||||
)
|
||||
|
||||
# -- lifecycle ---------------------------------------------------------
|
||||
|
||||
def start(self, name: str) -> None:
|
||||
"""Bring up a registered service (``s6-svc -u``)."""
|
||||
import subprocess
|
||||
subprocess.run(
|
||||
[f"{_S6_BIN_DIR}/s6-svc", "-u", str(self.scandir / name)],
|
||||
check=True, capture_output=True, timeout=5,
|
||||
)
|
||||
|
||||
def stop(self, name: str) -> None:
|
||||
"""Bring down a registered service (``s6-svc -d``)."""
|
||||
import subprocess
|
||||
subprocess.run(
|
||||
[f"{_S6_BIN_DIR}/s6-svc", "-d", str(self.scandir / name)],
|
||||
check=True, capture_output=True, timeout=5,
|
||||
)
|
||||
|
||||
def restart(self, name: str) -> None:
|
||||
"""Restart a registered service (``s6-svc -t`` = SIGTERM)."""
|
||||
import subprocess
|
||||
subprocess.run(
|
||||
[f"{_S6_BIN_DIR}/s6-svc", "-t", str(self.scandir / name)],
|
||||
check=True, capture_output=True, timeout=5,
|
||||
)
|
||||
|
||||
def is_running(self, name: str) -> bool:
|
||||
"""True iff ``s6-svstat`` reports the service as up."""
|
||||
import subprocess
|
||||
result = subprocess.run(
|
||||
[f"{_S6_BIN_DIR}/s6-svstat", str(self.scandir / name)],
|
||||
capture_output=True, text=True, timeout=5,
|
||||
)
|
||||
return result.returncode == 0 and "up " in result.stdout
|
||||
|
||||
# -- runtime registration ---------------------------------------------
|
||||
|
||||
def supports_runtime_registration(self) -> bool:
|
||||
return True
|
||||
|
||||
def register_profile_gateway(
|
||||
self,
|
||||
profile: str,
|
||||
*,
|
||||
port: int,
|
||||
extra_env: dict[str, str] | None = None,
|
||||
) -> None:
|
||||
"""Create the s6 service directory for a profile gateway.
|
||||
|
||||
Triggers ``s6-svscanctl -a`` so s6-svscan picks the new directory
|
||||
up immediately. The service is created in the *up* state — to
|
||||
register without auto-starting, follow up with ``stop(profile)``
|
||||
(or pass the start flag via the future ``start_now=False`` arg,
|
||||
which the Phase 4 reconciliation path uses via a ``down``
|
||||
marker file written directly).
|
||||
|
||||
Raises:
|
||||
ValueError: if the profile name is invalid or the service
|
||||
directory already exists.
|
||||
RuntimeError: if ``s6-svscanctl`` fails.
|
||||
"""
|
||||
import shutil
|
||||
import subprocess
|
||||
|
||||
svc_dir = self._service_dir(profile)
|
||||
if svc_dir.exists():
|
||||
raise ValueError(
|
||||
f"profile gateway {profile!r} already registered at {svc_dir}"
|
||||
)
|
||||
|
||||
# Build the service directory atomically: write to a sibling
|
||||
# temp dir, then rename. Avoids s6-svscan observing a half-
|
||||
# populated directory on a fast rescan.
|
||||
tmp_dir = svc_dir.with_name(svc_dir.name + ".tmp")
|
||||
if tmp_dir.exists():
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
tmp_dir.mkdir(parents=True)
|
||||
|
||||
try:
|
||||
(tmp_dir / "type").write_text("longrun\n")
|
||||
|
||||
run_script = self._render_run_script(profile, port, extra_env or {})
|
||||
run_path = tmp_dir / "run"
|
||||
run_path.write_text(run_script)
|
||||
run_path.chmod(0o755)
|
||||
|
||||
# Persistent log rotation (OQ8-C).
|
||||
log_subdir = tmp_dir / "log"
|
||||
log_subdir.mkdir()
|
||||
log_run = log_subdir / "run"
|
||||
log_run.write_text(self._render_log_run(profile))
|
||||
log_run.chmod(0o755)
|
||||
|
||||
tmp_dir.rename(svc_dir)
|
||||
except Exception:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
raise
|
||||
|
||||
# Trigger rescan so s6-svscan picks up the new service.
|
||||
result = subprocess.run(
|
||||
[f"{_S6_BIN_DIR}/s6-svscanctl", "-a", str(self.scandir)],
|
||||
capture_output=True, text=True, timeout=5,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
# Clean up: rescan failed, leave the directory in place would
|
||||
# be confusing (no supervisor watching it).
|
||||
shutil.rmtree(svc_dir, ignore_errors=True)
|
||||
raise RuntimeError(
|
||||
f"s6-svscanctl failed: {result.stderr or result.stdout}"
|
||||
)
|
||||
|
||||
def unregister_profile_gateway(self, profile: str) -> None:
|
||||
"""Stop the profile gateway service and remove its directory.
|
||||
|
||||
Idempotent: absent services are a no-op. Best-effort stop +
|
||||
wait-for-down before removal so the running gateway process
|
||||
gets a chance to shut down cleanly before its service dir
|
||||
disappears.
|
||||
"""
|
||||
import shutil
|
||||
import subprocess
|
||||
|
||||
svc_dir = self._service_dir(profile)
|
||||
if not svc_dir.exists():
|
||||
return
|
||||
|
||||
# Stop the service (best effort — service may already be down).
|
||||
subprocess.run(
|
||||
[f"{_S6_BIN_DIR}/s6-svc", "-d", str(svc_dir)],
|
||||
capture_output=True, text=True, timeout=5,
|
||||
check=False,
|
||||
)
|
||||
# Wait for it to actually go down (up to 10s).
|
||||
subprocess.run(
|
||||
[f"{_S6_BIN_DIR}/s6-svwait", "-D", "-t", "10000", str(svc_dir)],
|
||||
capture_output=True, text=True, timeout=15,
|
||||
check=False,
|
||||
)
|
||||
|
||||
# Remove the directory.
|
||||
shutil.rmtree(svc_dir, ignore_errors=True)
|
||||
|
||||
# Rescan so s6-svscan drops its supervise process for the dir.
|
||||
# -n = also reap orphan supervise processes.
|
||||
subprocess.run(
|
||||
[f"{_S6_BIN_DIR}/s6-svscanctl", "-an", str(self.scandir)],
|
||||
capture_output=True, text=True, timeout=5,
|
||||
check=False,
|
||||
)
|
||||
|
||||
def list_profile_gateways(self) -> list[str]:
|
||||
"""Return the profile names of all currently-registered gateway services.
|
||||
|
||||
Filters the scandir to entries that match the ``gateway-`` prefix.
|
||||
Other services (e.g. ``s6-linux-init-shutdownd``) are ignored.
|
||||
"""
|
||||
if not self.scandir.exists():
|
||||
return []
|
||||
profiles: list[str] = []
|
||||
for entry in self.scandir.iterdir():
|
||||
if entry.name.startswith("."):
|
||||
continue
|
||||
if not entry.is_dir():
|
||||
continue
|
||||
if not entry.name.startswith(S6_SERVICE_PREFIX):
|
||||
continue
|
||||
profiles.append(entry.name[len(S6_SERVICE_PREFIX):])
|
||||
return profiles
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue