diff --git a/gateway/run.py b/gateway/run.py index da78f5e9e..b3270d958 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -10052,6 +10052,16 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = "Replacing existing gateway instance (PID %d) with --replace.", existing_pid, ) + # Record a takeover marker so the target's shutdown handler + # recognises its SIGTERM as a planned takeover and exits 0 + # (rather than exit 1, which would trigger systemd's + # Restart=on-failure and start a flap loop against us). + # Best-effort — proceed even if the write fails. + try: + from gateway.status import write_takeover_marker + write_takeover_marker(existing_pid) + except Exception as e: + logger.debug("Could not write takeover marker: %s", e) try: terminate_pid(existing_pid, force=False) except ProcessLookupError: @@ -10061,6 +10071,13 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = "Permission denied killing PID %d. Cannot replace.", existing_pid, ) + # Marker is scoped to a specific target; clean it up on + # give-up so it doesn't grief an unrelated future shutdown. + try: + from gateway.status import clear_takeover_marker + clear_takeover_marker() + except Exception: + pass return False # Wait up to 10 seconds for the old process to exit for _ in range(20): @@ -10081,6 +10098,13 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = except (ProcessLookupError, PermissionError, OSError): pass remove_pid_file() + # Clean up any takeover marker the old process didn't consume + # (e.g. SIGKILL'd before its shutdown handler could read it). + try: + from gateway.status import clear_takeover_marker + clear_takeover_marker() + except Exception: + pass # Also release all scoped locks left by the old process. # Stopped (Ctrl+Z) processes don't release locks on exit, # leaving stale lock files that block the new gateway from starting. @@ -10148,8 +10172,27 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = # Set up signal handlers def shutdown_signal_handler(): nonlocal _signal_initiated_shutdown - _signal_initiated_shutdown = True - logger.info("Received SIGTERM/SIGINT — initiating shutdown") + # Planned --replace takeover check: when a sibling gateway is + # taking over via --replace, it wrote a marker naming this PID + # before sending SIGTERM. If present, treat the signal as a + # planned shutdown and exit 0 so systemd's Restart=on-failure + # doesn't revive us (which would flap-fight the replacer when + # both services are enabled, e.g. hermes.service + hermes- + # gateway.service from pre-rename installs). + planned_takeover = False + try: + from gateway.status import consume_takeover_marker_for_self + planned_takeover = consume_takeover_marker_for_self() + except Exception as e: + logger.debug("Takeover marker check failed: %s", e) + + if planned_takeover: + logger.info( + "Received SIGTERM as a planned --replace takeover — exiting cleanly" + ) + else: + _signal_initiated_shutdown = True + logger.info("Received SIGTERM/SIGINT — initiating shutdown") # Diagnostic: log all hermes-related processes so we can identify # what triggered the signal (hermes update, hermes gateway restart, # a stale detached subprocess, etc.). diff --git a/gateway/status.py b/gateway/status.py index 34c517dcf..e1598e179 100644 --- a/gateway/status.py +++ b/gateway/status.py @@ -425,6 +425,133 @@ def release_all_scoped_locks() -> int: return removed +# ── --replace takeover marker ───────────────────────────────────────── +# +# When a new gateway starts with ``--replace``, it SIGTERMs the existing +# gateway so it can take over the bot token. PR #5646 made SIGTERM exit +# the gateway with code 1 so ``Restart=on-failure`` can revive it after +# unexpected kills — but that also means a --replace takeover target +# exits 1, which tricks systemd into reviving it 30 seconds later, +# starting a flap loop against the replacer when both services are +# enabled in the user's systemd (e.g. ``hermes.service`` + ``hermes- +# gateway.service``). +# +# The takeover marker breaks the loop: the replacer writes a short-lived +# file naming the target PID + start_time BEFORE sending SIGTERM. +# The target's shutdown handler reads the marker and, if it names +# this process, treats the SIGTERM as a planned takeover and exits 0. +# The marker is unlinked after the target has consumed it, so a stale +# marker left by a crashed replacer can grief at most one future +# shutdown on the same PID — and only within _TAKEOVER_MARKER_TTL_S. + +_TAKEOVER_MARKER_FILENAME = ".gateway-takeover.json" +_TAKEOVER_MARKER_TTL_S = 60 # Marker older than this is treated as stale + + +def _get_takeover_marker_path() -> Path: + """Return the path to the --replace takeover marker file.""" + home = get_hermes_home() + return home / _TAKEOVER_MARKER_FILENAME + + +def write_takeover_marker(target_pid: int) -> bool: + """Record that ``target_pid`` is being replaced by the current process. + + Captures the target's ``start_time`` so that PID reuse after the + target exits cannot later match the marker. Also records the + replacer's PID and a UTC timestamp for TTL-based staleness checks. + + Returns True on successful write, False on any failure. The caller + should proceed with the SIGTERM even if the write fails (the marker + is a best-effort signal, not a correctness requirement). + """ + try: + target_start_time = _get_process_start_time(target_pid) + record = { + "target_pid": target_pid, + "target_start_time": target_start_time, + "replacer_pid": os.getpid(), + "written_at": _utc_now_iso(), + } + _write_json_file(_get_takeover_marker_path(), record) + return True + except (OSError, PermissionError): + return False + + +def consume_takeover_marker_for_self() -> bool: + """Check & unlink the takeover marker if it names the current process. + + Returns True only when a valid (non-stale) marker names this PID + + start_time. A returning True indicates the current SIGTERM is a + planned --replace takeover; the caller should exit 0 instead of + signalling ``_signal_initiated_shutdown``. + + Always unlinks the marker on match (and on detected staleness) so + subsequent unrelated signals don't re-trigger. + """ + path = _get_takeover_marker_path() + record = _read_json_file(path) + if not record: + return False + + # Any malformed or stale marker → drop it and return False + try: + target_pid = int(record["target_pid"]) + target_start_time = record.get("target_start_time") + written_at = record.get("written_at") or "" + except (KeyError, TypeError, ValueError): + try: + path.unlink(missing_ok=True) + except OSError: + pass + return False + + # TTL guard: a stale marker older than _TAKEOVER_MARKER_TTL_S is ignored. + stale = False + try: + written_dt = datetime.fromisoformat(written_at) + age = (datetime.now(timezone.utc) - written_dt).total_seconds() + if age > _TAKEOVER_MARKER_TTL_S: + stale = True + except (TypeError, ValueError): + stale = True # Unparseable timestamp — treat as stale + + if stale: + try: + path.unlink(missing_ok=True) + except OSError: + pass + return False + + # Does the marker name THIS process? + our_pid = os.getpid() + our_start_time = _get_process_start_time(our_pid) + matches = ( + target_pid == our_pid + and target_start_time is not None + and our_start_time is not None + and target_start_time == our_start_time + ) + + # Consume the marker whether it matched or not — a marker that doesn't + # match our identity is stale-for-us anyway. + try: + path.unlink(missing_ok=True) + except OSError: + pass + + return matches + + +def clear_takeover_marker() -> None: + """Remove the takeover marker unconditionally. Safe to call repeatedly.""" + try: + _get_takeover_marker_path().unlink(missing_ok=True) + except OSError: + pass + + def get_running_pid( pid_path: Optional[Path] = None, *, diff --git a/hermes_cli/gateway.py b/hermes_cli/gateway.py index 08f9999e6..bc809cadf 100644 --- a/hermes_cli/gateway.py +++ b/hermes_cli/gateway.py @@ -687,6 +687,195 @@ def has_conflicting_systemd_units() -> bool: return len(get_installed_systemd_scopes()) > 1 +# Legacy service names from older Hermes installs that predate the +# hermes-gateway rename. Kept as an explicit allowlist (NOT a glob) so +# profile units (hermes-gateway-*.service) and unrelated third-party +# "hermes" units are never matched. +_LEGACY_SERVICE_NAMES: tuple[str, ...] = ("hermes.service",) + +# ExecStart content markers that identify a unit as running our gateway. +# A legacy unit is only flagged when its file contains one of these. +_LEGACY_UNIT_EXECSTART_MARKERS: tuple[str, ...] = ( + "hermes_cli.main gateway", + "hermes_cli/main.py gateway", + "gateway/run.py", + " hermes gateway ", + "/hermes gateway ", +) + + +def _legacy_unit_search_paths() -> list[tuple[bool, Path]]: + """Return ``[(is_system, base_dir), ...]`` — directories to scan for legacy units. + + Factored out so tests can monkeypatch the search roots without touching + real filesystem paths. + """ + return [ + (False, Path.home() / ".config" / "systemd" / "user"), + (True, Path("/etc/systemd/system")), + ] + + +def _find_legacy_hermes_units() -> list[tuple[str, Path, bool]]: + """Return ``[(unit_name, unit_path, is_system)]`` for legacy Hermes gateway units. + + Detects unit files installed by older Hermes versions that used a + different service name (e.g. ``hermes.service`` before the rename to + ``hermes-gateway.service``). When both a legacy unit and the current + ``hermes-gateway.service`` are active, they fight over the same bot + token — the PR #5646 signal-recovery change turns this into a 30-second + SIGTERM flap loop. + + Safety guards: + + * Explicit allowlist of legacy names (no globbing). Profile units such + as ``hermes-gateway-coder.service`` and unrelated third-party + ``hermes-*`` services are never matched. + * ExecStart content check — only flag units that invoke our gateway + entrypoint. A user-created ``hermes.service`` running an unrelated + binary is left untouched. + * Results are returned purely for caller inspection; this function + never mutates or removes anything. + """ + results: list[tuple[str, Path, bool]] = [] + for is_system, base in _legacy_unit_search_paths(): + for name in _LEGACY_SERVICE_NAMES: + unit_path = base / name + try: + if not unit_path.exists(): + continue + text = unit_path.read_text(encoding="utf-8", errors="ignore") + except (OSError, PermissionError): + continue + if not any(marker in text for marker in _LEGACY_UNIT_EXECSTART_MARKERS): + # Not our gateway — leave alone + continue + results.append((name, unit_path, is_system)) + return results + + +def has_legacy_hermes_units() -> bool: + """Return True when any legacy Hermes gateway unit files exist.""" + return bool(_find_legacy_hermes_units()) + + +def print_legacy_unit_warning() -> None: + """Warn about legacy Hermes gateway unit files if any are installed. + + Idempotent: prints nothing when no legacy units are detected. Safe to + call from any status/install/setup path. + """ + legacy = _find_legacy_hermes_units() + if not legacy: + return + print_warning("Legacy Hermes gateway unit(s) detected from an older install:") + for name, path, is_system in legacy: + scope = "system" if is_system else "user" + print_info(f" {path} ({scope} scope)") + print_info(" These run alongside the current hermes-gateway service and") + print_info(" cause SIGTERM flap loops — both try to use the same bot token.") + print_info(" Remove them with:") + print_info(" hermes gateway migrate-legacy") + + +def remove_legacy_hermes_units( + interactive: bool = True, + dry_run: bool = False, +) -> tuple[int, list[Path]]: + """Stop, disable, and remove legacy Hermes gateway unit files. + + Iterates over whatever ``_find_legacy_hermes_units()`` returns — which is + an explicit allowlist of legacy names (not a glob). Profile units and + unrelated third-party services are never touched. + + Args: + interactive: When True, prompt before removing. When False, remove + without asking (used when another prompt has already confirmed, + e.g. from the install flow). + dry_run: When True, list what would be removed and return. + + Returns: + ``(removed_count, remaining_paths)`` — remaining includes units we + couldn't remove (typically system-scope when not running as root). + """ + legacy = _find_legacy_hermes_units() + if not legacy: + print("No legacy Hermes gateway units found.") + return 0, [] + + user_units = [(n, p) for n, p, is_sys in legacy if not is_sys] + system_units = [(n, p) for n, p, is_sys in legacy if is_sys] + + print() + print("Legacy Hermes gateway unit(s) found:") + for name, path, is_system in legacy: + scope = "system" if is_system else "user" + print(f" {path} ({scope} scope)") + print() + + if dry_run: + print("(dry-run — nothing removed)") + return 0, [p for _, p, _ in legacy] + + if interactive and not prompt_yes_no("Remove these legacy units?", True): + print("Skipped. Run again with: hermes gateway migrate-legacy") + return 0, [p for _, p, _ in legacy] + + removed = 0 + remaining: list[Path] = [] + + # User-scope removal + for name, path in user_units: + try: + _run_systemctl(["stop", name], system=False, check=False, timeout=90) + _run_systemctl(["disable", name], system=False, check=False, timeout=30) + path.unlink(missing_ok=True) + print(f" ✓ Removed {path}") + removed += 1 + except (OSError, RuntimeError) as e: + print(f" ⚠ Could not remove {path}: {e}") + remaining.append(path) + + if user_units: + try: + _run_systemctl(["daemon-reload"], system=False, check=False, timeout=30) + except RuntimeError: + pass + + # System-scope removal (needs root) + if system_units: + if os.geteuid() != 0: + print() + print_warning("System-scope legacy units require root to remove.") + print_info(" Re-run with: sudo hermes gateway migrate-legacy") + for _, path in system_units: + remaining.append(path) + else: + for name, path in system_units: + try: + _run_systemctl(["stop", name], system=True, check=False, timeout=90) + _run_systemctl(["disable", name], system=True, check=False, timeout=30) + path.unlink(missing_ok=True) + print(f" ✓ Removed {path}") + removed += 1 + except (OSError, RuntimeError) as e: + print(f" ⚠ Could not remove {path}: {e}") + remaining.append(path) + + try: + _run_systemctl(["daemon-reload"], system=True, check=False, timeout=30) + except RuntimeError: + pass + + print() + if remaining: + print_warning(f"{len(remaining)} legacy unit(s) still present — see messages above.") + else: + print_success(f"Removed {removed} legacy unit(s).") + + return removed, remaining + + def print_systemd_scope_conflict_warning() -> None: scopes = get_installed_systemd_scopes() if len(scopes) < 2: @@ -1220,6 +1409,19 @@ def systemd_install(force: bool = False, system: bool = False, run_as_user: str if system: _require_root_for_system_service("install") + # Offer to remove legacy units (hermes.service from pre-rename installs) + # before installing the new hermes-gateway.service. If both remain, they + # flap-fight for the Telegram bot token on every gateway startup. + # Only removes units matching _LEGACY_SERVICE_NAMES + our ExecStart + # signature — profile units are never touched. + if has_legacy_hermes_units(): + print() + print_legacy_unit_warning() + print() + if prompt_yes_no("Remove the legacy unit(s) before installing?", True): + remove_legacy_hermes_units(interactive=False) + print() + unit_path = get_systemd_unit_path(system=system) scope_flag = " --system" if system else "" @@ -1258,6 +1460,7 @@ def systemd_install(force: bool = False, system: bool = False, run_as_user: str _ensure_linger_enabled() print_systemd_scope_conflict_warning() + print_legacy_unit_warning() def systemd_uninstall(system: bool = False): @@ -1381,6 +1584,10 @@ def systemd_status(deep: bool = False, system: bool = False): print_systemd_scope_conflict_warning() print() + if has_legacy_hermes_units(): + print_legacy_unit_warning() + print() + if not systemd_unit_is_current(system=system): print("⚠ Installed gateway service definition is outdated") print(f" Run: {'sudo ' if system else ''}hermes gateway restart{scope_flag} # auto-refreshes the unit") @@ -3137,6 +3344,10 @@ def gateway_setup(): print_systemd_scope_conflict_warning() print() + if supports_systemd_services() and has_legacy_hermes_units(): + print_legacy_unit_warning() + print() + if service_installed and service_running: print_success("Gateway service is installed and running.") elif service_installed: @@ -3594,3 +3805,14 @@ def gateway_command(args): else: print(" hermes gateway install # Install as user service") print(" sudo hermes gateway install --system # Install as boot-time system service") + + elif subcmd == "migrate-legacy": + # Stop, disable, and remove legacy Hermes gateway unit files from + # pre-rename installs (e.g. hermes.service). Profile units and + # unrelated third-party services are never touched. + dry_run = getattr(args, 'dry_run', False) + yes = getattr(args, 'yes', False) + if not supports_systemd_services() and not is_macos(): + print("Legacy unit migration only applies to systemd-based Linux hosts.") + return + remove_legacy_hermes_units(interactive=not yes, dry_run=dry_run) diff --git a/hermes_cli/main.py b/hermes_cli/main.py index f978e5828..051e7b47b 100644 --- a/hermes_cli/main.py +++ b/hermes_cli/main.py @@ -6442,6 +6442,31 @@ For more help on a command: # gateway setup gateway_subparsers.add_parser("setup", help="Configure messaging platforms") + # gateway migrate-legacy + gateway_migrate_legacy = gateway_subparsers.add_parser( + "migrate-legacy", + help="Remove legacy hermes.service units from pre-rename installs", + description=( + "Stop, disable, and remove legacy Hermes gateway unit files " + "(e.g. hermes.service) left over from older installs. Profile " + "units (hermes-gateway-.service) and unrelated " + "third-party services are never touched." + ), + ) + gateway_migrate_legacy.add_argument( + "--dry-run", + dest="dry_run", + action="store_true", + help="List what would be removed without doing it", + ) + gateway_migrate_legacy.add_argument( + "-y", + "--yes", + dest="yes", + action="store_true", + help="Skip the confirmation prompt", + ) + gateway_parser.set_defaults(func=cmd_gateway) # ========================================================================= diff --git a/hermes_cli/setup.py b/hermes_cli/setup.py index 95c9cae77..8770386b7 100644 --- a/hermes_cli/setup.py +++ b/hermes_cli/setup.py @@ -2242,8 +2242,10 @@ def setup_gateway(config: dict): _is_service_running, supports_systemd_services, has_conflicting_systemd_units, + has_legacy_hermes_units, install_linux_gateway_from_setup, print_systemd_scope_conflict_warning, + print_legacy_unit_warning, systemd_start, systemd_restart, launchd_install, @@ -2261,6 +2263,10 @@ def setup_gateway(config: dict): print_systemd_scope_conflict_warning() print() + if supports_systemd and has_legacy_hermes_units(): + print_legacy_unit_warning() + print() + if service_running: if prompt_yes_no(" Restart the gateway to pick up changes?", True): try: diff --git a/tests/gateway/test_runner_startup_failures.py b/tests/gateway/test_runner_startup_failures.py index 77bd25ae2..977d66fb3 100644 --- a/tests/gateway/test_runner_startup_failures.py +++ b/tests/gateway/test_runner_startup_failures.py @@ -202,3 +202,120 @@ async def test_start_gateway_replace_force_uses_terminate_pid(monkeypatch, tmp_p assert ok is True assert calls == [(42, False), (42, True)] + + +@pytest.mark.asyncio +async def test_start_gateway_replace_writes_takeover_marker_before_sigterm( + monkeypatch, tmp_path +): + """--replace must write a takeover marker BEFORE sending SIGTERM. + + The marker lets the target's shutdown handler identify the signal as a + planned takeover (→ exit 0) rather than an unexpected kill (→ exit 1). + Without the marker, PR #5646's signal-recovery path would revive the + target via systemd Restart=on-failure, starting a flap loop. + """ + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + + # Record the ORDER of marker-write + terminate_pid calls + events: list[str] = [] + marker_paths_seen: list = [] + + def record_write_marker(target_pid: int) -> bool: + events.append(f"write_marker(target_pid={target_pid})") + # Also check that the marker file actually exists after this call + marker_paths_seen.append( + (tmp_path / ".gateway-takeover.json").exists() is False # not yet + ) + # Actually write the marker so we can verify cleanup later + from gateway.status import _get_takeover_marker_path, _write_json_file, _get_process_start_time + _write_json_file(_get_takeover_marker_path(), { + "target_pid": target_pid, + "target_start_time": 0, + "replacer_pid": 100, + "written_at": "2026-04-17T00:00:00+00:00", + }) + return True + + def record_terminate(pid, force=False): + events.append(f"terminate_pid(pid={pid}, force={force})") + + class _CleanExitRunner: + def __init__(self, config): + self.config = config + self.should_exit_cleanly = True + self.exit_reason = None + self.adapters = {} + + async def start(self): + return True + + async def stop(self): + return None + + monkeypatch.setattr("gateway.status.get_running_pid", lambda: 42) + monkeypatch.setattr("gateway.status.remove_pid_file", lambda: None) + monkeypatch.setattr("gateway.status.release_all_scoped_locks", lambda: 0) + monkeypatch.setattr("gateway.status.write_takeover_marker", record_write_marker) + monkeypatch.setattr("gateway.status.terminate_pid", record_terminate) + monkeypatch.setattr("gateway.run.os.getpid", lambda: 100) + # Simulate old process exiting on first check so we don't loop into force-kill + monkeypatch.setattr( + "gateway.run.os.kill", + lambda pid, sig: (_ for _ in ()).throw(ProcessLookupError()), + ) + monkeypatch.setattr("time.sleep", lambda _: None) + monkeypatch.setattr("tools.skills_sync.sync_skills", lambda quiet=True: None) + monkeypatch.setattr("hermes_logging.setup_logging", lambda hermes_home, mode: tmp_path) + monkeypatch.setattr("hermes_logging._add_rotating_handler", lambda *args, **kwargs: None) + monkeypatch.setattr("gateway.run.GatewayRunner", _CleanExitRunner) + + from gateway.run import start_gateway + + ok = await start_gateway(config=GatewayConfig(), replace=True, verbosity=None) + + assert ok is True + # Ordering: marker written BEFORE SIGTERM + assert events[0] == "write_marker(target_pid=42)" + assert any(e.startswith("terminate_pid(pid=42") for e in events[1:]) + # Marker file cleanup: replacer cleans it after loop completes + assert not (tmp_path / ".gateway-takeover.json").exists() + + +@pytest.mark.asyncio +async def test_start_gateway_replace_clears_marker_on_permission_denied( + monkeypatch, tmp_path +): + """If we fail to kill the existing PID (permission denied), clean up the + marker so it doesn't grief an unrelated future shutdown.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + + def write_marker(target_pid: int) -> bool: + from gateway.status import _get_takeover_marker_path, _write_json_file + _write_json_file(_get_takeover_marker_path(), { + "target_pid": target_pid, + "target_start_time": 0, + "replacer_pid": 100, + "written_at": "2026-04-17T00:00:00+00:00", + }) + return True + + def raise_permission(pid, force=False): + raise PermissionError("simulated EPERM") + + monkeypatch.setattr("gateway.status.get_running_pid", lambda: 42) + monkeypatch.setattr("gateway.status.write_takeover_marker", write_marker) + monkeypatch.setattr("gateway.status.terminate_pid", raise_permission) + monkeypatch.setattr("gateway.run.os.getpid", lambda: 100) + monkeypatch.setattr("tools.skills_sync.sync_skills", lambda quiet=True: None) + monkeypatch.setattr("hermes_logging.setup_logging", lambda hermes_home, mode: tmp_path) + monkeypatch.setattr("hermes_logging._add_rotating_handler", lambda *args, **kwargs: None) + + from gateway.run import start_gateway + + # Should return False due to permission error + ok = await start_gateway(config=GatewayConfig(), replace=True, verbosity=None) + + assert ok is False + # Marker must NOT be left behind + assert not (tmp_path / ".gateway-takeover.json").exists() diff --git a/tests/gateway/test_status.py b/tests/gateway/test_status.py index 08a12ea7a..04a0856f6 100644 --- a/tests/gateway/test_status.py +++ b/tests/gateway/test_status.py @@ -264,3 +264,181 @@ class TestScopedLocks: status.release_scoped_lock("telegram-bot-token", "secret") assert not lock_path.exists() + + +class TestTakeoverMarker: + """Tests for the --replace takeover marker. + + The marker breaks the post-#5646 flap loop between two gateway services + fighting for the same bot token. The replacer writes a file naming the + target PID + start_time; the target's shutdown handler sees it and exits + 0 instead of 1, so systemd's Restart=on-failure doesn't revive it. + """ + + def test_write_marker_records_target_identity(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 42) + + ok = status.write_takeover_marker(target_pid=12345) + + assert ok is True + marker = tmp_path / ".gateway-takeover.json" + assert marker.exists() + payload = json.loads(marker.read_text()) + assert payload["target_pid"] == 12345 + assert payload["target_start_time"] == 42 + assert payload["replacer_pid"] == os.getpid() + assert "written_at" in payload + + def test_consume_returns_true_when_marker_names_self(self, tmp_path, monkeypatch): + """Primary happy path: planned takeover is recognised.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + # Mark THIS process as the target + monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 100) + ok = status.write_takeover_marker(target_pid=os.getpid()) + assert ok is True + + # Call consume as if this process just got SIGTERMed + result = status.consume_takeover_marker_for_self() + + assert result is True + # Marker must be unlinked after consumption + assert not (tmp_path / ".gateway-takeover.json").exists() + + def test_consume_returns_false_for_different_pid(self, tmp_path, monkeypatch): + """A marker naming a DIFFERENT process must not be consumed as ours.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 100) + # Marker names a different PID + other_pid = os.getpid() + 9999 + ok = status.write_takeover_marker(target_pid=other_pid) + assert ok is True + + result = status.consume_takeover_marker_for_self() + + assert result is False + # Marker IS unlinked even on non-match (the record has been consumed + # and isn't relevant to us — leaving it around would grief a later + # legitimate check). + assert not (tmp_path / ".gateway-takeover.json").exists() + + def test_consume_returns_false_on_start_time_mismatch(self, tmp_path, monkeypatch): + """PID reuse defence: old marker's start_time mismatches current process.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + # Marker says target started at time 100 with our PID + monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 100) + status.write_takeover_marker(target_pid=os.getpid()) + + # Now change the reported start_time to simulate PID reuse + monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 9999) + + result = status.consume_takeover_marker_for_self() + + assert result is False + + def test_consume_returns_false_when_marker_missing(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + + result = status.consume_takeover_marker_for_self() + + assert result is False + + def test_consume_returns_false_for_stale_marker(self, tmp_path, monkeypatch): + """A marker older than 60s must be ignored.""" + from datetime import datetime, timezone, timedelta + + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + marker_path = tmp_path / ".gateway-takeover.json" + # Hand-craft a marker written 2 minutes ago + stale_time = (datetime.now(timezone.utc) - timedelta(minutes=2)).isoformat() + marker_path.write_text(json.dumps({ + "target_pid": os.getpid(), + "target_start_time": 123, + "replacer_pid": 99999, + "written_at": stale_time, + })) + monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123) + + result = status.consume_takeover_marker_for_self() + + assert result is False + # Stale markers are unlinked so a later legit shutdown isn't griefed + assert not marker_path.exists() + + def test_consume_handles_malformed_marker_gracefully(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + marker_path = tmp_path / ".gateway-takeover.json" + marker_path.write_text("not valid json{") + + # Must not raise + result = status.consume_takeover_marker_for_self() + + assert result is False + + def test_consume_handles_marker_with_missing_fields(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + marker_path = tmp_path / ".gateway-takeover.json" + marker_path.write_text(json.dumps({"only_replacer_pid": 99999})) + + result = status.consume_takeover_marker_for_self() + + assert result is False + # Malformed marker should be cleaned up + assert not marker_path.exists() + + def test_clear_takeover_marker_is_idempotent(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + + # Nothing to clear — must not raise + status.clear_takeover_marker() + + # Write then clear + monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 100) + status.write_takeover_marker(target_pid=12345) + assert (tmp_path / ".gateway-takeover.json").exists() + + status.clear_takeover_marker() + assert not (tmp_path / ".gateway-takeover.json").exists() + + # Clear again — still no error + status.clear_takeover_marker() + + def test_write_marker_returns_false_on_write_failure(self, tmp_path, monkeypatch): + """write_takeover_marker is best-effort; returns False but doesn't raise.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + + def raise_oserror(*args, **kwargs): + raise OSError("simulated write failure") + + monkeypatch.setattr(status, "_write_json_file", raise_oserror) + + ok = status.write_takeover_marker(target_pid=12345) + + assert ok is False + + def test_consume_ignores_marker_for_different_process_and_prevents_stale_grief( + self, tmp_path, monkeypatch + ): + """Regression: a stale marker from a dead replacer naming a dead + target must not accidentally cause an unrelated future gateway to + exit 0 on legitimate SIGTERM. + + The distinguishing check is ``target_pid == our_pid AND + target_start_time == our_start_time``. Different PID always wins. + """ + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + marker_path = tmp_path / ".gateway-takeover.json" + # Fresh marker (timestamp is recent) but names a totally different PID + from datetime import datetime, timezone + marker_path.write_text(json.dumps({ + "target_pid": os.getpid() + 10000, + "target_start_time": 42, + "replacer_pid": 99999, + "written_at": datetime.now(timezone.utc).isoformat(), + })) + monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 42) + + result = status.consume_takeover_marker_for_self() + + # We are not the target — must NOT consume as planned + assert result is False diff --git a/tests/hermes_cli/test_gateway_service.py b/tests/hermes_cli/test_gateway_service.py index eb2a42348..3c03aab7e 100644 --- a/tests/hermes_cli/test_gateway_service.py +++ b/tests/hermes_cli/test_gateway_service.py @@ -1177,3 +1177,556 @@ class TestDockerAwareGateway: out = capsys.readouterr().out assert "docker" in out.lower() assert "hermes gateway run" in out + + +class TestLegacyHermesUnitDetection: + """Tests for _find_legacy_hermes_units / has_legacy_hermes_units. + + These guard against the scenario that tripped Luis in April 2026: an + older install left a ``hermes.service`` unit behind when the service was + renamed to ``hermes-gateway.service``. After PR #5646 (signal recovery + via systemd), the two services began SIGTERM-flapping over the same + Telegram bot token in a 30-second cycle. + + The detector must flag ``hermes.service`` ONLY when it actually runs our + gateway, and must NEVER flag profile units + (``hermes-gateway-.service``) or unrelated third-party services. + """ + + # Minimal ExecStart that looks like our gateway + _OUR_UNIT_TEXT = ( + "[Unit]\nDescription=Hermes Gateway\n[Service]\n" + "ExecStart=/usr/bin/python -m hermes_cli.main gateway run --replace\n" + ) + + @staticmethod + def _setup_search_paths(tmp_path, monkeypatch): + """Redirect the legacy search to user_dir + system_dir under tmp_path.""" + user_dir = tmp_path / "user" + system_dir = tmp_path / "system" + user_dir.mkdir() + system_dir.mkdir() + monkeypatch.setattr( + gateway_cli, + "_legacy_unit_search_paths", + lambda: [(False, user_dir), (True, system_dir)], + ) + return user_dir, system_dir + + def test_detects_legacy_hermes_service_in_user_scope(self, tmp_path, monkeypatch): + user_dir, _ = self._setup_search_paths(tmp_path, monkeypatch) + legacy = user_dir / "hermes.service" + legacy.write_text(self._OUR_UNIT_TEXT, encoding="utf-8") + + results = gateway_cli._find_legacy_hermes_units() + + assert len(results) == 1 + name, path, is_system = results[0] + assert name == "hermes.service" + assert path == legacy + assert is_system is False + assert gateway_cli.has_legacy_hermes_units() is True + + def test_detects_legacy_hermes_service_in_system_scope(self, tmp_path, monkeypatch): + _, system_dir = self._setup_search_paths(tmp_path, monkeypatch) + legacy = system_dir / "hermes.service" + legacy.write_text(self._OUR_UNIT_TEXT, encoding="utf-8") + + results = gateway_cli._find_legacy_hermes_units() + + assert len(results) == 1 + name, path, is_system = results[0] + assert name == "hermes.service" + assert path == legacy + assert is_system is True + + def test_ignores_profile_unit_hermes_gateway_coder(self, tmp_path, monkeypatch): + """CRITICAL: profile units must NOT be flagged as legacy. + + Teknium's concern — ``hermes-gateway-coder.service`` is our standard + naming for the ``coder`` profile. The legacy detector is an explicit + allowlist, not a glob, so profile units are safe. + """ + user_dir, system_dir = self._setup_search_paths(tmp_path, monkeypatch) + # Drop profile units in BOTH scopes with our ExecStart + for base in (user_dir, system_dir): + (base / "hermes-gateway-coder.service").write_text( + self._OUR_UNIT_TEXT, encoding="utf-8" + ) + (base / "hermes-gateway-orcha.service").write_text( + self._OUR_UNIT_TEXT, encoding="utf-8" + ) + (base / "hermes-gateway.service").write_text( + self._OUR_UNIT_TEXT, encoding="utf-8" + ) + + results = gateway_cli._find_legacy_hermes_units() + + assert results == [] + assert gateway_cli.has_legacy_hermes_units() is False + + def test_ignores_unrelated_hermes_service(self, tmp_path, monkeypatch): + """Third-party ``hermes.service`` that isn't ours stays untouched. + + If a user has some other package named ``hermes`` installed as a + service, we must not flag it. + """ + user_dir, _ = self._setup_search_paths(tmp_path, monkeypatch) + (user_dir / "hermes.service").write_text( + "[Unit]\nDescription=Some Other Hermes\n[Service]\n" + "ExecStart=/opt/other-hermes/bin/daemon --foreground\n", + encoding="utf-8", + ) + + results = gateway_cli._find_legacy_hermes_units() + + assert results == [] + assert gateway_cli.has_legacy_hermes_units() is False + + def test_returns_empty_when_no_legacy_files_exist(self, tmp_path, monkeypatch): + self._setup_search_paths(tmp_path, monkeypatch) + + assert gateway_cli._find_legacy_hermes_units() == [] + assert gateway_cli.has_legacy_hermes_units() is False + + def test_detects_both_scopes_simultaneously(self, tmp_path, monkeypatch): + """When a user has BOTH user-scope and system-scope legacy units, + both are reported so the migration step can remove them together.""" + user_dir, system_dir = self._setup_search_paths(tmp_path, monkeypatch) + (user_dir / "hermes.service").write_text(self._OUR_UNIT_TEXT, encoding="utf-8") + (system_dir / "hermes.service").write_text(self._OUR_UNIT_TEXT, encoding="utf-8") + + results = gateway_cli._find_legacy_hermes_units() + + scopes = sorted(is_system for _, _, is_system in results) + assert scopes == [False, True] + + def test_accepts_alternate_execstart_formats(self, tmp_path, monkeypatch): + """Older installs may have used different python invocations. + + ExecStart variants we've seen in the wild: + - python -m hermes_cli.main gateway run + - python path/to/hermes_cli/main.py gateway run + - hermes gateway run (direct binary) + - python path/to/gateway/run.py + """ + user_dir, _ = self._setup_search_paths(tmp_path, monkeypatch) + variants = [ + "ExecStart=/venv/bin/python -m hermes_cli.main gateway run --replace", + "ExecStart=/venv/bin/python /opt/hermes/hermes_cli/main.py gateway run", + "ExecStart=/usr/local/bin/hermes gateway run --replace", + "ExecStart=/venv/bin/python /opt/hermes/gateway/run.py", + ] + for i, execstart in enumerate(variants): + name = f"hermes.service" if i == 0 else f"hermes.service" # same name + # Test each variant fresh + (user_dir / "hermes.service").write_text( + f"[Unit]\nDescription=Old Hermes\n[Service]\n{execstart}\n", + encoding="utf-8", + ) + results = gateway_cli._find_legacy_hermes_units() + assert len(results) == 1, f"Variant {i} not detected: {execstart!r}" + + def test_print_legacy_unit_warning_is_noop_when_empty(self, tmp_path, monkeypatch, capsys): + self._setup_search_paths(tmp_path, monkeypatch) + + gateway_cli.print_legacy_unit_warning() + out = capsys.readouterr().out + + assert out == "" + + def test_print_legacy_unit_warning_shows_migration_hint(self, tmp_path, monkeypatch, capsys): + user_dir, _ = self._setup_search_paths(tmp_path, monkeypatch) + (user_dir / "hermes.service").write_text(self._OUR_UNIT_TEXT, encoding="utf-8") + + gateway_cli.print_legacy_unit_warning() + out = capsys.readouterr().out + + assert "Legacy" in out + assert "hermes.service" in out + assert "hermes gateway migrate-legacy" in out + + def test_handles_unreadable_unit_file_gracefully(self, tmp_path, monkeypatch): + """A permission error reading a unit file must not crash detection.""" + user_dir, _ = self._setup_search_paths(tmp_path, monkeypatch) + unreadable = user_dir / "hermes.service" + unreadable.write_text(self._OUR_UNIT_TEXT, encoding="utf-8") + # Simulate a read failure — monkeypatch Path.read_text to raise + original_read_text = gateway_cli.Path.read_text + + def raising_read_text(self, *args, **kwargs): + if self == unreadable: + raise PermissionError("simulated") + return original_read_text(self, *args, **kwargs) + + monkeypatch.setattr(gateway_cli.Path, "read_text", raising_read_text) + + # Should not raise + results = gateway_cli._find_legacy_hermes_units() + assert results == [] + + +class TestRemoveLegacyHermesUnits: + """Tests for remove_legacy_hermes_units (the migration action).""" + + _OUR_UNIT_TEXT = ( + "[Unit]\nDescription=Hermes Gateway\n[Service]\n" + "ExecStart=/usr/bin/python -m hermes_cli.main gateway run --replace\n" + ) + + @staticmethod + def _setup(tmp_path, monkeypatch, as_root=False): + user_dir = tmp_path / "user" + system_dir = tmp_path / "system" + user_dir.mkdir() + system_dir.mkdir() + monkeypatch.setattr( + gateway_cli, + "_legacy_unit_search_paths", + lambda: [(False, user_dir), (True, system_dir)], + ) + # Mock systemctl — return success for everything + systemctl_calls: list[list[str]] = [] + + def fake_run(cmd, **kwargs): + systemctl_calls.append(cmd) + return SimpleNamespace(returncode=0, stdout="", stderr="") + + monkeypatch.setattr(gateway_cli.subprocess, "run", fake_run) + monkeypatch.setattr(gateway_cli.os, "geteuid", lambda: 0 if as_root else 1000) + return user_dir, system_dir, systemctl_calls + + def test_returns_zero_when_no_legacy_units(self, tmp_path, monkeypatch, capsys): + self._setup(tmp_path, monkeypatch) + + removed, remaining = gateway_cli.remove_legacy_hermes_units(interactive=False) + + assert removed == 0 + assert remaining == [] + assert "No legacy" in capsys.readouterr().out + + def test_dry_run_lists_without_removing(self, tmp_path, monkeypatch, capsys): + user_dir, _, calls = self._setup(tmp_path, monkeypatch) + legacy = user_dir / "hermes.service" + legacy.write_text(self._OUR_UNIT_TEXT, encoding="utf-8") + + removed, remaining = gateway_cli.remove_legacy_hermes_units( + interactive=False, dry_run=True + ) + + assert removed == 0 + assert remaining == [legacy] + assert legacy.exists() # Not removed + assert calls == [] # No systemctl invocations + out = capsys.readouterr().out + assert "dry-run" in out + + def test_removes_user_scope_legacy_unit(self, tmp_path, monkeypatch, capsys): + user_dir, _, calls = self._setup(tmp_path, monkeypatch) + legacy = user_dir / "hermes.service" + legacy.write_text(self._OUR_UNIT_TEXT, encoding="utf-8") + + removed, remaining = gateway_cli.remove_legacy_hermes_units(interactive=False) + + assert removed == 1 + assert remaining == [] + assert not legacy.exists() + # Must have invoked stop → disable → daemon-reload on user scope + cmds_joined = [" ".join(c) for c in calls] + assert any("--user stop hermes.service" in c for c in cmds_joined) + assert any("--user disable hermes.service" in c for c in cmds_joined) + assert any("--user daemon-reload" in c for c in cmds_joined) + + def test_system_scope_without_root_defers_removal(self, tmp_path, monkeypatch, capsys): + _, system_dir, calls = self._setup(tmp_path, monkeypatch, as_root=False) + legacy = system_dir / "hermes.service" + legacy.write_text(self._OUR_UNIT_TEXT, encoding="utf-8") + + removed, remaining = gateway_cli.remove_legacy_hermes_units(interactive=False) + + assert removed == 0 + assert remaining == [legacy] + assert legacy.exists() # Not removed — requires sudo + out = capsys.readouterr().out + assert "sudo hermes gateway migrate-legacy" in out + + def test_system_scope_with_root_removes(self, tmp_path, monkeypatch, capsys): + _, system_dir, calls = self._setup(tmp_path, monkeypatch, as_root=True) + legacy = system_dir / "hermes.service" + legacy.write_text(self._OUR_UNIT_TEXT, encoding="utf-8") + + removed, remaining = gateway_cli.remove_legacy_hermes_units(interactive=False) + + assert removed == 1 + assert remaining == [] + assert not legacy.exists() + cmds_joined = [" ".join(c) for c in calls] + # System-scope uses plain "systemctl" (no --user) + assert any( + c.startswith("systemctl stop hermes.service") for c in cmds_joined + ) + assert any( + c.startswith("systemctl disable hermes.service") for c in cmds_joined + ) + + def test_removes_both_scopes_with_root(self, tmp_path, monkeypatch, capsys): + user_dir, system_dir, _ = self._setup(tmp_path, monkeypatch, as_root=True) + user_legacy = user_dir / "hermes.service" + system_legacy = system_dir / "hermes.service" + user_legacy.write_text(self._OUR_UNIT_TEXT, encoding="utf-8") + system_legacy.write_text(self._OUR_UNIT_TEXT, encoding="utf-8") + + removed, remaining = gateway_cli.remove_legacy_hermes_units(interactive=False) + + assert removed == 2 + assert remaining == [] + assert not user_legacy.exists() + assert not system_legacy.exists() + + def test_does_not_touch_profile_units_during_migration( + self, tmp_path, monkeypatch, capsys + ): + """Teknium's constraint: profile units (hermes-gateway-coder.service) + must survive a migration call, even if we somehow include them in the + search dir.""" + user_dir, _, _ = self._setup(tmp_path, monkeypatch, as_root=True) + profile_unit = user_dir / "hermes-gateway-coder.service" + profile_unit.write_text(self._OUR_UNIT_TEXT, encoding="utf-8") + default_unit = user_dir / "hermes-gateway.service" + default_unit.write_text(self._OUR_UNIT_TEXT, encoding="utf-8") + + removed, remaining = gateway_cli.remove_legacy_hermes_units(interactive=False) + + assert removed == 0 + assert remaining == [] + # Both the profile unit and the current default unit must survive + assert profile_unit.exists() + assert default_unit.exists() + + def test_interactive_prompt_no_skips_removal(self, tmp_path, monkeypatch, capsys): + """When interactive=True and user answers no, no removal happens.""" + user_dir, _, _ = self._setup(tmp_path, monkeypatch) + legacy = user_dir / "hermes.service" + legacy.write_text(self._OUR_UNIT_TEXT, encoding="utf-8") + + monkeypatch.setattr(gateway_cli, "prompt_yes_no", lambda *a, **k: False) + + removed, remaining = gateway_cli.remove_legacy_hermes_units(interactive=True) + + assert removed == 0 + assert remaining == [legacy] + assert legacy.exists() + + +class TestMigrateLegacyCommand: + """Tests for the `hermes gateway migrate-legacy` subcommand dispatch.""" + + def test_migrate_legacy_subparser_accepts_dry_run_and_yes(self): + """Verify the argparse subparser is registered and parses flags.""" + import hermes_cli.main as cli_main + + parser = cli_main.build_parser() if hasattr(cli_main, "build_parser") else None + # Fall back to calling main's setup helper if direct access isn't exposed + # The key thing: the subparser must exist. We verify by constructing + # a namespace through argparse directly — but if build_parser isn't + # public, just confirm that `hermes gateway --help` shows it. + import subprocess + import sys + + project_root = cli_main.PROJECT_ROOT if hasattr(cli_main, "PROJECT_ROOT") else None + if project_root is None: + import hermes_cli.gateway as gw + project_root = gw.PROJECT_ROOT + + result = subprocess.run( + [sys.executable, "-m", "hermes_cli.main", "gateway", "--help"], + cwd=str(project_root), + capture_output=True, + text=True, + timeout=15, + ) + assert result.returncode == 0 + assert "migrate-legacy" in result.stdout + + def test_gateway_command_migrate_legacy_dispatches( + self, tmp_path, monkeypatch, capsys + ): + """gateway_command(args) with subcmd='migrate-legacy' calls the helper.""" + called = {} + + def fake_remove(interactive=True, dry_run=False): + called["interactive"] = interactive + called["dry_run"] = dry_run + return 0, [] + + monkeypatch.setattr(gateway_cli, "remove_legacy_hermes_units", fake_remove) + monkeypatch.setattr(gateway_cli, "supports_systemd_services", lambda: True) + monkeypatch.setattr(gateway_cli, "is_macos", lambda: False) + + args = SimpleNamespace( + gateway_command="migrate-legacy", dry_run=False, yes=True + ) + gateway_cli.gateway_command(args) + + assert called == {"interactive": False, "dry_run": False} + + def test_gateway_command_migrate_legacy_dry_run_passes_through( + self, monkeypatch + ): + called = {} + + def fake_remove(interactive=True, dry_run=False): + called["interactive"] = interactive + called["dry_run"] = dry_run + return 0, [] + + monkeypatch.setattr(gateway_cli, "remove_legacy_hermes_units", fake_remove) + monkeypatch.setattr(gateway_cli, "supports_systemd_services", lambda: True) + monkeypatch.setattr(gateway_cli, "is_macos", lambda: False) + + args = SimpleNamespace( + gateway_command="migrate-legacy", dry_run=True, yes=False + ) + gateway_cli.gateway_command(args) + + assert called == {"interactive": True, "dry_run": True} + + def test_migrate_legacy_on_unsupported_platform_prints_message( + self, monkeypatch, capsys + ): + monkeypatch.setattr(gateway_cli, "supports_systemd_services", lambda: False) + monkeypatch.setattr(gateway_cli, "is_macos", lambda: False) + + args = SimpleNamespace( + gateway_command="migrate-legacy", dry_run=False, yes=True + ) + gateway_cli.gateway_command(args) + + out = capsys.readouterr().out + assert "only applies to systemd" in out + + +class TestSystemdInstallOffersLegacyRemoval: + """Verify that systemd_install prompts to remove legacy units first.""" + + def test_install_offers_removal_when_legacy_detected( + self, tmp_path, monkeypatch, capsys + ): + """When legacy units exist, install flow should call the removal + helper before writing the new unit.""" + remove_called = {} + + def fake_remove(interactive=True, dry_run=False): + remove_called["invoked"] = True + remove_called["interactive"] = interactive + return 1, [] + + # has_legacy_hermes_units must return True + monkeypatch.setattr(gateway_cli, "has_legacy_hermes_units", lambda: True) + monkeypatch.setattr(gateway_cli, "remove_legacy_hermes_units", fake_remove) + monkeypatch.setattr(gateway_cli, "print_legacy_unit_warning", lambda: None) + # Answer "yes" to the legacy-removal prompt + monkeypatch.setattr(gateway_cli, "prompt_yes_no", lambda *a, **k: True) + + # Mock the rest of the install flow + unit_path = tmp_path / "hermes-gateway.service" + monkeypatch.setattr( + gateway_cli, "get_systemd_unit_path", lambda system=False: unit_path + ) + monkeypatch.setattr( + gateway_cli, + "generate_systemd_unit", + lambda system=False, run_as_user=None: "unit text\n", + ) + monkeypatch.setattr( + gateway_cli.subprocess, + "run", + lambda cmd, **kw: SimpleNamespace(returncode=0, stdout="", stderr=""), + ) + monkeypatch.setattr(gateway_cli, "_ensure_linger_enabled", lambda: None) + + gateway_cli.systemd_install() + + assert remove_called.get("invoked") is True + assert remove_called.get("interactive") is False # prompted elsewhere + + def test_install_declines_legacy_removal_when_user_says_no( + self, tmp_path, monkeypatch + ): + """When legacy units exist and user declines, install still proceeds + but doesn't touch them.""" + remove_called = {"invoked": False} + + def fake_remove(interactive=True, dry_run=False): + remove_called["invoked"] = True + return 0, [] + + monkeypatch.setattr(gateway_cli, "has_legacy_hermes_units", lambda: True) + monkeypatch.setattr(gateway_cli, "remove_legacy_hermes_units", fake_remove) + monkeypatch.setattr(gateway_cli, "print_legacy_unit_warning", lambda: None) + monkeypatch.setattr(gateway_cli, "prompt_yes_no", lambda *a, **k: False) + + unit_path = tmp_path / "hermes-gateway.service" + monkeypatch.setattr( + gateway_cli, "get_systemd_unit_path", lambda system=False: unit_path + ) + monkeypatch.setattr( + gateway_cli, + "generate_systemd_unit", + lambda system=False, run_as_user=None: "unit text\n", + ) + monkeypatch.setattr( + gateway_cli.subprocess, + "run", + lambda cmd, **kw: SimpleNamespace(returncode=0, stdout="", stderr=""), + ) + monkeypatch.setattr(gateway_cli, "_ensure_linger_enabled", lambda: None) + + gateway_cli.systemd_install() + + # Helper must NOT have been called + assert remove_called["invoked"] is False + # New unit should still have been written + assert unit_path.exists() + assert unit_path.read_text() == "unit text\n" + + def test_install_skips_legacy_check_when_none_present( + self, tmp_path, monkeypatch + ): + """No legacy → no prompt, no helper call.""" + prompt_called = {"count": 0} + + def counting_prompt(*a, **k): + prompt_called["count"] += 1 + return True + + remove_called = {"invoked": False} + + def fake_remove(interactive=True, dry_run=False): + remove_called["invoked"] = True + return 0, [] + + monkeypatch.setattr(gateway_cli, "has_legacy_hermes_units", lambda: False) + monkeypatch.setattr(gateway_cli, "remove_legacy_hermes_units", fake_remove) + monkeypatch.setattr(gateway_cli, "prompt_yes_no", counting_prompt) + + unit_path = tmp_path / "hermes-gateway.service" + monkeypatch.setattr( + gateway_cli, "get_systemd_unit_path", lambda system=False: unit_path + ) + monkeypatch.setattr( + gateway_cli, + "generate_systemd_unit", + lambda system=False, run_as_user=None: "unit text\n", + ) + monkeypatch.setattr( + gateway_cli.subprocess, + "run", + lambda cmd, **kw: SimpleNamespace(returncode=0, stdout="", stderr=""), + ) + monkeypatch.setattr(gateway_cli, "_ensure_linger_enabled", lambda: None) + + gateway_cli.systemd_install() + + assert prompt_called["count"] == 0 + assert remove_called["invoked"] is False