hermes-agent/tests/gateway/test_status.py
Teknium 9b55365f6f
fix(gateway,cron): close ephemeral agents + reap stale aux clients (salvage #13979) (#16598)
* fix: clean gateway auxiliary client caches on teardown

* fix(gateway): recover from stale pid files and close cron agents

Two issues were keeping the gateway from surviving long runs:

1. `_cleanup_invalid_pid_path` delegated to `remove_pid_file`, which
   refuses to unlink when the file's pid differs from our own. That
   safety check exists for the --replace atexit handoff, but it also
   applied to stale-record cleanup, so after a crashy exit the pid
   file was orphaned: `write_pid_file()`'s O_EXCL create then failed
   with `FileExistsError`, and systemd looped on "PID file race lost
   to another gateway instance". Unlink unconditionally from this
   helper since the caller has already verified the record is dead.

2. The cron scheduler never closed the ephemeral `AIAgent` it creates
   per tick, and never swept the process-global auxiliary-client
   cache. Over days of 10-minute ticks this leaked subprocesses and
   async httpx transports until the gateway hit EMFILE. Release the
   agent and call `cleanup_stale_async_clients()` in `run_job`'s
   outer `finally`, matching the gateway's own per-turn cleanup.

* chore(release): map bloodcarter@gmail.com -> bloodcarter

---------

Co-authored-by: bloodcarter <bloodcarter@gmail.com>
2026-04-27 07:41:42 -07:00

653 lines
26 KiB
Python

"""Tests for gateway runtime status tracking."""
import json
import os
from types import SimpleNamespace
from gateway import status
class TestGatewayPidState:
def test_write_pid_file_records_gateway_metadata(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
status.write_pid_file()
payload = json.loads((tmp_path / "gateway.pid").read_text())
assert payload["pid"] == os.getpid()
assert payload["kind"] == "hermes-gateway"
assert isinstance(payload["argv"], list)
assert payload["argv"]
def test_write_pid_file_is_atomic_against_concurrent_writers(self, tmp_path, monkeypatch):
"""Regression: two concurrent --replace invocations must not both win.
Without O_CREAT|O_EXCL, two processes racing through start_gateway()'s
termination-wait would both write to gateway.pid, silently overwriting
each other and leaving multiple gateway instances alive (#11718).
"""
import pytest
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
# First write wins.
status.write_pid_file()
assert (tmp_path / "gateway.pid").exists()
# Second write (simulating a racing --replace that missed the earlier
# guards) must raise FileExistsError rather than clobber the record.
with pytest.raises(FileExistsError):
status.write_pid_file()
# Original record is preserved.
payload = json.loads((tmp_path / "gateway.pid").read_text())
assert payload["pid"] == os.getpid()
def test_get_running_pid_rejects_live_non_gateway_pid(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
pid_path = tmp_path / "gateway.pid"
pid_path.write_text(str(os.getpid()))
assert status.get_running_pid() is None
assert not pid_path.exists()
def test_get_running_pid_cleans_stale_record_from_dead_process(self, tmp_path, monkeypatch):
# Simulates the aftermath of a crash: the PID file still points at a
# process that no longer exists. The next gateway startup must be
# able to unlink it so ``write_pid_file``'s O_EXCL create succeeds —
# otherwise systemd's restart loop hits "PID file race lost" forever.
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
pid_path = tmp_path / "gateway.pid"
dead_pid = 999999 # not our pid, and below we simulate it's dead
pid_path.write_text(json.dumps({
"pid": dead_pid,
"kind": "hermes-gateway",
"argv": ["python", "-m", "hermes_cli.main", "gateway", "run"],
"start_time": 111,
}))
def _dead_process(pid, sig):
raise ProcessLookupError
monkeypatch.setattr(status.os, "kill", _dead_process)
assert status.get_running_pid() is None
assert not pid_path.exists()
def test_get_running_pid_accepts_gateway_metadata_when_cmdline_unavailable(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
pid_path = tmp_path / "gateway.pid"
pid_path.write_text(json.dumps({
"pid": os.getpid(),
"kind": "hermes-gateway",
"argv": ["python", "-m", "hermes_cli.main", "gateway"],
"start_time": 123,
}))
monkeypatch.setattr(status.os, "kill", lambda pid, sig: None)
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
monkeypatch.setattr(status, "_read_process_cmdline", lambda pid: None)
assert status.acquire_gateway_runtime_lock() is True
try:
assert status.get_running_pid() == os.getpid()
finally:
status.release_gateway_runtime_lock()
def test_get_running_pid_accepts_script_style_gateway_cmdline(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
pid_path = tmp_path / "gateway.pid"
pid_path.write_text(json.dumps({
"pid": os.getpid(),
"kind": "hermes-gateway",
"argv": ["/venv/bin/python", "/repo/hermes_cli/main.py", "gateway", "run", "--replace"],
"start_time": 123,
}))
monkeypatch.setattr(status.os, "kill", lambda pid, sig: None)
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
monkeypatch.setattr(
status,
"_read_process_cmdline",
lambda pid: "/venv/bin/python /repo/hermes_cli/main.py gateway run --replace",
)
assert status.acquire_gateway_runtime_lock() is True
try:
assert status.get_running_pid() == os.getpid()
finally:
status.release_gateway_runtime_lock()
def test_get_running_pid_accepts_explicit_pid_path_without_cleanup(self, tmp_path, monkeypatch):
other_home = tmp_path / "profile-home"
other_home.mkdir()
pid_path = other_home / "gateway.pid"
pid_path.write_text(json.dumps({
"pid": os.getpid(),
"kind": "hermes-gateway",
"argv": ["python", "-m", "hermes_cli.main", "gateway"],
"start_time": 123,
}))
monkeypatch.setattr(status.os, "kill", lambda pid, sig: None)
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
monkeypatch.setattr(status, "_read_process_cmdline", lambda pid: None)
lock_path = other_home / "gateway.lock"
lock_path.write_text(json.dumps({
"pid": os.getpid(),
"kind": "hermes-gateway",
"argv": ["python", "-m", "hermes_cli.main", "gateway"],
"start_time": 123,
}))
monkeypatch.setattr(status, "is_gateway_runtime_lock_active", lambda lock_path=None: True)
assert status.get_running_pid(pid_path, cleanup_stale=False) == os.getpid()
assert pid_path.exists()
def test_runtime_lock_claims_and_releases_liveness(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
assert status.is_gateway_runtime_lock_active() is False
assert status.acquire_gateway_runtime_lock() is True
assert status.is_gateway_runtime_lock_active() is True
status.release_gateway_runtime_lock()
assert status.is_gateway_runtime_lock_active() is False
def test_get_running_pid_treats_pid_file_as_stale_without_runtime_lock(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
pid_path = tmp_path / "gateway.pid"
pid_path.write_text(json.dumps({
"pid": os.getpid(),
"kind": "hermes-gateway",
"argv": ["python", "-m", "hermes_cli.main", "gateway"],
"start_time": 123,
}))
monkeypatch.setattr(status.os, "kill", lambda pid, sig: None)
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
monkeypatch.setattr(status, "_read_process_cmdline", lambda pid: None)
assert status.get_running_pid() is None
assert not pid_path.exists()
def test_get_running_pid_cleans_stale_metadata_from_dead_foreign_pid(self, tmp_path, monkeypatch):
"""Stale PID file from a *different* PID (crashed process) must still be cleaned.
Regression for: ``remove_pid_file()`` defensively refuses to delete a
PID file whose pid != ``os.getpid()`` to protect ``--replace``
handoffs. Stale-cleanup must not go through that path or real
crashed-process PID files never get removed.
"""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
pid_path = tmp_path / "gateway.pid"
lock_path = tmp_path / "gateway.lock"
# PID that is guaranteed not alive and not our own.
dead_foreign_pid = 999999
assert dead_foreign_pid != os.getpid()
pid_path.write_text(json.dumps({
"pid": dead_foreign_pid,
"kind": "hermes-gateway",
"argv": ["python", "-m", "hermes_cli.main", "gateway"],
"start_time": 123,
}))
lock_path.write_text(json.dumps({
"pid": dead_foreign_pid,
"kind": "hermes-gateway",
"argv": ["python", "-m", "hermes_cli.main", "gateway"],
"start_time": 123,
}))
# No live lock holder → get_running_pid should clean both files.
assert status.get_running_pid() is None
assert not pid_path.exists()
assert not lock_path.exists()
def test_get_running_pid_falls_back_to_live_lock_record(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
pid_path = tmp_path / "gateway.pid"
pid_path.write_text(json.dumps({
"pid": 99999,
"kind": "hermes-gateway",
"argv": ["python", "-m", "hermes_cli.main", "gateway"],
"start_time": 123,
}))
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
monkeypatch.setattr(status, "_read_process_cmdline", lambda pid: None)
monkeypatch.setattr(
status,
"_build_pid_record",
lambda: {
"pid": os.getpid(),
"kind": "hermes-gateway",
"argv": ["python", "-m", "hermes_cli.main", "gateway"],
"start_time": 123,
},
)
assert status.acquire_gateway_runtime_lock() is True
def fake_kill(pid, sig):
if pid == 99999:
raise ProcessLookupError
return None
monkeypatch.setattr(status.os, "kill", fake_kill)
try:
assert status.get_running_pid() == os.getpid()
finally:
status.release_gateway_runtime_lock()
class TestGatewayRuntimeStatus:
def test_write_runtime_status_overwrites_stale_pid_on_restart(self, tmp_path, monkeypatch):
"""Regression: setdefault() preserved stale PID from previous process (#1631)."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
# Simulate a previous gateway run that left a state file with a stale PID
state_path = tmp_path / "gateway_state.json"
state_path.write_text(json.dumps({
"pid": 99999,
"start_time": 1000.0,
"kind": "hermes-gateway",
"platforms": {},
"updated_at": "2025-01-01T00:00:00Z",
}))
status.write_runtime_status(gateway_state="running")
payload = status.read_runtime_status()
assert payload["pid"] == os.getpid(), "PID should be overwritten, not preserved via setdefault"
assert payload["start_time"] != 1000.0, "start_time should be overwritten on restart"
def test_write_runtime_status_records_platform_failure(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
status.write_runtime_status(
gateway_state="startup_failed",
exit_reason="telegram conflict",
platform="telegram",
platform_state="fatal",
error_code="telegram_polling_conflict",
error_message="another poller is active",
)
payload = status.read_runtime_status()
assert payload["gateway_state"] == "startup_failed"
assert payload["exit_reason"] == "telegram conflict"
assert payload["platforms"]["telegram"]["state"] == "fatal"
assert payload["platforms"]["telegram"]["error_code"] == "telegram_polling_conflict"
assert payload["platforms"]["telegram"]["error_message"] == "another poller is active"
def test_write_runtime_status_explicit_none_clears_stale_fields(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
status.write_runtime_status(
gateway_state="startup_failed",
exit_reason="stale error",
platform="discord",
platform_state="fatal",
error_code="discord_timeout",
error_message="stale platform error",
)
status.write_runtime_status(
gateway_state="running",
exit_reason=None,
platform="discord",
platform_state="connected",
error_code=None,
error_message=None,
)
payload = status.read_runtime_status()
assert payload["gateway_state"] == "running"
assert payload["exit_reason"] is None
assert payload["platforms"]["discord"]["state"] == "connected"
assert payload["platforms"]["discord"]["error_code"] is None
assert payload["platforms"]["discord"]["error_message"] is None
class TestTerminatePid:
def test_force_uses_taskkill_on_windows(self, monkeypatch):
calls = []
monkeypatch.setattr(status, "_IS_WINDOWS", True)
def fake_run(cmd, capture_output=False, text=False, timeout=None):
calls.append((cmd, capture_output, text, timeout))
return SimpleNamespace(returncode=0, stdout="", stderr="")
monkeypatch.setattr(status.subprocess, "run", fake_run)
status.terminate_pid(123, force=True)
assert calls == [
(["taskkill", "/PID", "123", "/T", "/F"], True, True, 10)
]
def test_force_falls_back_to_sigterm_when_taskkill_missing(self, monkeypatch):
calls = []
monkeypatch.setattr(status, "_IS_WINDOWS", True)
def fake_run(*args, **kwargs):
raise FileNotFoundError
def fake_kill(pid, sig):
calls.append((pid, sig))
monkeypatch.setattr(status.subprocess, "run", fake_run)
monkeypatch.setattr(status.os, "kill", fake_kill)
status.terminate_pid(456, force=True)
assert calls == [(456, status.signal.SIGTERM)]
class TestScopedLocks:
def test_acquire_scoped_lock_rejects_live_other_process(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock"
lock_path.parent.mkdir(parents=True, exist_ok=True)
lock_path.write_text(json.dumps({
"pid": 99999,
"start_time": 123,
"kind": "hermes-gateway",
}))
monkeypatch.setattr(status.os, "kill", lambda pid, sig: None)
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
acquired, existing = status.acquire_scoped_lock("telegram-bot-token", "secret", metadata={"platform": "telegram"})
assert acquired is False
assert existing["pid"] == 99999
def test_acquire_scoped_lock_replaces_stale_record(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock"
lock_path.parent.mkdir(parents=True, exist_ok=True)
lock_path.write_text(json.dumps({
"pid": 99999,
"start_time": 123,
"kind": "hermes-gateway",
}))
def fake_kill(pid, sig):
raise ProcessLookupError
monkeypatch.setattr(status.os, "kill", fake_kill)
acquired, existing = status.acquire_scoped_lock("telegram-bot-token", "secret", metadata={"platform": "telegram"})
assert acquired is True
payload = json.loads(lock_path.read_text())
assert payload["pid"] == os.getpid()
assert payload["metadata"]["platform"] == "telegram"
def test_acquire_scoped_lock_recovers_empty_lock_file(self, tmp_path, monkeypatch):
"""Empty lock file (0 bytes) left by a crashed process should be treated as stale."""
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
lock_path = tmp_path / "locks" / "slack-app-token-2bb80d537b1da3e3.lock"
lock_path.parent.mkdir(parents=True, exist_ok=True)
lock_path.write_text("") # simulate crash between O_CREAT and json.dump
acquired, existing = status.acquire_scoped_lock("slack-app-token", "secret", metadata={"platform": "slack"})
assert acquired is True
payload = json.loads(lock_path.read_text())
assert payload["pid"] == os.getpid()
assert payload["metadata"]["platform"] == "slack"
def test_acquire_scoped_lock_recovers_corrupt_lock_file(self, tmp_path, monkeypatch):
"""Lock file with invalid JSON should be treated as stale."""
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
lock_path = tmp_path / "locks" / "slack-app-token-2bb80d537b1da3e3.lock"
lock_path.parent.mkdir(parents=True, exist_ok=True)
lock_path.write_text("{truncated") # simulate partial write
acquired, existing = status.acquire_scoped_lock("slack-app-token", "secret", metadata={"platform": "slack"})
assert acquired is True
payload = json.loads(lock_path.read_text())
assert payload["pid"] == os.getpid()
def test_release_scoped_lock_only_removes_current_owner(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
acquired, _ = status.acquire_scoped_lock("telegram-bot-token", "secret", metadata={"platform": "telegram"})
assert acquired is True
lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock"
assert lock_path.exists()
status.release_scoped_lock("telegram-bot-token", "secret")
assert not lock_path.exists()
def test_release_all_scoped_locks_can_target_single_owner(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
lock_dir = tmp_path / "locks"
lock_dir.mkdir(parents=True, exist_ok=True)
target_lock = lock_dir / "telegram-bot-token-target.lock"
other_lock = lock_dir / "slack-app-token-other.lock"
target_lock.write_text(json.dumps({
"pid": 111,
"start_time": 222,
"kind": "hermes-gateway",
}))
other_lock.write_text(json.dumps({
"pid": 999,
"start_time": 333,
"kind": "hermes-gateway",
}))
removed = status.release_all_scoped_locks(
owner_pid=111,
owner_start_time=222,
)
assert removed == 1
assert not target_lock.exists()
assert other_lock.exists()
def test_release_all_scoped_locks_skips_pid_reuse_mismatch(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
lock_dir = tmp_path / "locks"
lock_dir.mkdir(parents=True, exist_ok=True)
reused_pid_lock = lock_dir / "telegram-bot-token-reused.lock"
reused_pid_lock.write_text(json.dumps({
"pid": 111,
"start_time": 999,
"kind": "hermes-gateway",
}))
removed = status.release_all_scoped_locks(
owner_pid=111,
owner_start_time=222,
)
assert removed == 0
assert reused_pid_lock.exists()
class TestTakeoverMarker:
"""Tests for the --replace takeover marker.
The marker breaks the post-#5646 flap loop between two gateway services
fighting for the same bot token. The replacer writes a file naming the
target PID + start_time; the target's shutdown handler sees it and exits
0 instead of 1, so systemd's Restart=on-failure doesn't revive it.
"""
def test_write_marker_records_target_identity(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 42)
ok = status.write_takeover_marker(target_pid=12345)
assert ok is True
marker = tmp_path / ".gateway-takeover.json"
assert marker.exists()
payload = json.loads(marker.read_text())
assert payload["target_pid"] == 12345
assert payload["target_start_time"] == 42
assert payload["replacer_pid"] == os.getpid()
assert "written_at" in payload
def test_consume_returns_true_when_marker_names_self(self, tmp_path, monkeypatch):
"""Primary happy path: planned takeover is recognised."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
# Mark THIS process as the target
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 100)
ok = status.write_takeover_marker(target_pid=os.getpid())
assert ok is True
# Call consume as if this process just got SIGTERMed
result = status.consume_takeover_marker_for_self()
assert result is True
# Marker must be unlinked after consumption
assert not (tmp_path / ".gateway-takeover.json").exists()
def test_consume_returns_false_for_different_pid(self, tmp_path, monkeypatch):
"""A marker naming a DIFFERENT process must not be consumed as ours."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 100)
# Marker names a different PID
other_pid = os.getpid() + 9999
ok = status.write_takeover_marker(target_pid=other_pid)
assert ok is True
result = status.consume_takeover_marker_for_self()
assert result is False
# Marker IS unlinked even on non-match (the record has been consumed
# and isn't relevant to us — leaving it around would grief a later
# legitimate check).
assert not (tmp_path / ".gateway-takeover.json").exists()
def test_consume_returns_false_on_start_time_mismatch(self, tmp_path, monkeypatch):
"""PID reuse defence: old marker's start_time mismatches current process."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
# Marker says target started at time 100 with our PID
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 100)
status.write_takeover_marker(target_pid=os.getpid())
# Now change the reported start_time to simulate PID reuse
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 9999)
result = status.consume_takeover_marker_for_self()
assert result is False
def test_consume_returns_false_when_marker_missing(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
result = status.consume_takeover_marker_for_self()
assert result is False
def test_consume_returns_false_for_stale_marker(self, tmp_path, monkeypatch):
"""A marker older than 60s must be ignored."""
from datetime import datetime, timezone, timedelta
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
marker_path = tmp_path / ".gateway-takeover.json"
# Hand-craft a marker written 2 minutes ago
stale_time = (datetime.now(timezone.utc) - timedelta(minutes=2)).isoformat()
marker_path.write_text(json.dumps({
"target_pid": os.getpid(),
"target_start_time": 123,
"replacer_pid": 99999,
"written_at": stale_time,
}))
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
result = status.consume_takeover_marker_for_self()
assert result is False
# Stale markers are unlinked so a later legit shutdown isn't griefed
assert not marker_path.exists()
def test_consume_handles_malformed_marker_gracefully(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
marker_path = tmp_path / ".gateway-takeover.json"
marker_path.write_text("not valid json{")
# Must not raise
result = status.consume_takeover_marker_for_self()
assert result is False
def test_consume_handles_marker_with_missing_fields(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
marker_path = tmp_path / ".gateway-takeover.json"
marker_path.write_text(json.dumps({"only_replacer_pid": 99999}))
result = status.consume_takeover_marker_for_self()
assert result is False
# Malformed marker should be cleaned up
assert not marker_path.exists()
def test_clear_takeover_marker_is_idempotent(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
# Nothing to clear — must not raise
status.clear_takeover_marker()
# Write then clear
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 100)
status.write_takeover_marker(target_pid=12345)
assert (tmp_path / ".gateway-takeover.json").exists()
status.clear_takeover_marker()
assert not (tmp_path / ".gateway-takeover.json").exists()
# Clear again — still no error
status.clear_takeover_marker()
def test_write_marker_returns_false_on_write_failure(self, tmp_path, monkeypatch):
"""write_takeover_marker is best-effort; returns False but doesn't raise."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
def raise_oserror(*args, **kwargs):
raise OSError("simulated write failure")
monkeypatch.setattr(status, "_write_json_file", raise_oserror)
ok = status.write_takeover_marker(target_pid=12345)
assert ok is False
def test_consume_ignores_marker_for_different_process_and_prevents_stale_grief(
self, tmp_path, monkeypatch
):
"""Regression: a stale marker from a dead replacer naming a dead
target must not accidentally cause an unrelated future gateway to
exit 0 on legitimate SIGTERM.
The distinguishing check is ``target_pid == our_pid AND
target_start_time == our_start_time``. Different PID always wins.
"""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
marker_path = tmp_path / ".gateway-takeover.json"
# Fresh marker (timestamp is recent) but names a totally different PID
from datetime import datetime, timezone
marker_path.write_text(json.dumps({
"target_pid": os.getpid() + 10000,
"target_start_time": 42,
"replacer_pid": 99999,
"written_at": datetime.now(timezone.utc).isoformat(),
}))
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 42)
result = status.consume_takeover_marker_for_self()
# We are not the target — must NOT consume as planned
assert result is False