fix(gateway): only clear locks belonging to the replaced process

This commit is contained in:
hharry11 2026-04-24 00:27:41 +03:00 committed by Teknium
parent a0d8dd7ba3
commit d0821b0573
4 changed files with 95 additions and 6 deletions

View file

@ -10954,6 +10954,7 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
from gateway.status import ( from gateway.status import (
acquire_gateway_runtime_lock, acquire_gateway_runtime_lock,
get_running_pid, get_running_pid,
get_process_start_time,
release_gateway_runtime_lock, release_gateway_runtime_lock,
remove_pid_file, remove_pid_file,
terminate_pid, terminate_pid,
@ -10961,6 +10962,7 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
existing_pid = get_running_pid() existing_pid = get_running_pid()
if existing_pid is not None and existing_pid != os.getpid(): if existing_pid is not None and existing_pid != os.getpid():
if replace: if replace:
existing_start_time = get_process_start_time(existing_pid)
logger.info( logger.info(
"Replacing existing gateway instance (PID %d) with --replace.", "Replacing existing gateway instance (PID %d) with --replace.",
existing_pid, existing_pid,
@ -11029,7 +11031,10 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
# leaving stale lock files that block the new gateway from starting. # leaving stale lock files that block the new gateway from starting.
try: try:
from gateway.status import release_all_scoped_locks from gateway.status import release_all_scoped_locks
_released = release_all_scoped_locks() _released = release_all_scoped_locks(
owner_pid=existing_pid,
owner_start_time=existing_start_time,
)
if _released: if _released:
logger.info("Released %d stale scoped lock(s) from old gateway.", _released) logger.info("Released %d stale scoped lock(s) from old gateway.", _released)
except Exception: except Exception:

View file

@ -113,6 +113,11 @@ def _get_process_start_time(pid: int) -> Optional[int]:
return None return None
def get_process_start_time(pid: int) -> Optional[int]:
"""Public wrapper for retrieving a process start time when available."""
return _get_process_start_time(pid)
def _read_process_cmdline(pid: int) -> Optional[str]: def _read_process_cmdline(pid: int) -> Optional[str]:
"""Return the process command line as a space-separated string.""" """Return the process command line as a space-separated string."""
cmdline_path = Path(f"/proc/{pid}/cmdline") cmdline_path = Path(f"/proc/{pid}/cmdline")
@ -562,17 +567,43 @@ def release_scoped_lock(scope: str, identity: str) -> None:
pass pass
def release_all_scoped_locks() -> int: def release_all_scoped_locks(
"""Remove all scoped lock files in the lock directory. *,
owner_pid: Optional[int] = None,
owner_start_time: Optional[int] = None,
) -> int:
"""Remove scoped lock files in the lock directory.
Called during --replace to clean up stale locks left by stopped/killed Called during --replace to clean up stale locks left by stopped/killed
gateway processes that did not release their locks gracefully. gateway processes that did not release their locks gracefully. When an
``owner_pid`` is provided, only lock records belonging to that gateway
process are removed. ``owner_start_time`` further narrows the match to
protect against PID reuse.
When no owner is provided, preserves the legacy behavior and removes every
scoped lock file in the directory.
Returns the number of lock files removed. Returns the number of lock files removed.
""" """
lock_dir = _get_lock_dir() lock_dir = _get_lock_dir()
removed = 0 removed = 0
if lock_dir.exists(): if lock_dir.exists():
for lock_file in lock_dir.glob("*.lock"): for lock_file in lock_dir.glob("*.lock"):
if owner_pid is not None:
record = _read_json_file(lock_file)
if not isinstance(record, dict):
continue
try:
record_pid = int(record.get("pid"))
except (TypeError, ValueError):
continue
if record_pid != owner_pid:
continue
if (
owner_start_time is not None
and record.get("start_time") != owner_start_time
):
continue
try: try:
lock_file.unlink(missing_ok=True) lock_file.unlink(missing_ok=True)
removed += 1 removed += 1

View file

@ -193,7 +193,10 @@ async def test_start_gateway_replace_force_uses_terminate_pid(monkeypatch, tmp_p
_pid_state["alive"] = False _pid_state["alive"] = False
monkeypatch.setattr("gateway.status.get_running_pid", _mock_get_running_pid) monkeypatch.setattr("gateway.status.get_running_pid", _mock_get_running_pid)
monkeypatch.setattr("gateway.status.remove_pid_file", _mock_remove_pid_file) monkeypatch.setattr("gateway.status.remove_pid_file", _mock_remove_pid_file)
monkeypatch.setattr("gateway.status.release_all_scoped_locks", lambda: 0) monkeypatch.setattr(
"gateway.status.release_all_scoped_locks",
lambda **kwargs: 0,
)
monkeypatch.setattr("gateway.status.terminate_pid", lambda pid, force=False: calls.append((pid, force))) monkeypatch.setattr("gateway.status.terminate_pid", lambda pid, force=False: calls.append((pid, force)))
monkeypatch.setattr("gateway.run.os.getpid", lambda: 100) monkeypatch.setattr("gateway.run.os.getpid", lambda: 100)
monkeypatch.setattr("gateway.run.os.kill", lambda pid, sig: None) monkeypatch.setattr("gateway.run.os.kill", lambda pid, sig: None)
@ -267,7 +270,10 @@ async def test_start_gateway_replace_writes_takeover_marker_before_sigterm(
_pid_state["alive"] = False _pid_state["alive"] = False
monkeypatch.setattr("gateway.status.get_running_pid", _mock_get_running_pid) monkeypatch.setattr("gateway.status.get_running_pid", _mock_get_running_pid)
monkeypatch.setattr("gateway.status.remove_pid_file", _mock_remove_pid_file) monkeypatch.setattr("gateway.status.remove_pid_file", _mock_remove_pid_file)
monkeypatch.setattr("gateway.status.release_all_scoped_locks", lambda: 0) monkeypatch.setattr(
"gateway.status.release_all_scoped_locks",
lambda **kwargs: 0,
)
monkeypatch.setattr("gateway.status.write_takeover_marker", record_write_marker) monkeypatch.setattr("gateway.status.write_takeover_marker", record_write_marker)
monkeypatch.setattr("gateway.status.terminate_pid", record_terminate) monkeypatch.setattr("gateway.status.terminate_pid", record_terminate)
monkeypatch.setattr("gateway.run.os.getpid", lambda: 100) monkeypatch.setattr("gateway.run.os.getpid", lambda: 100)

View file

@ -404,6 +404,53 @@ class TestScopedLocks:
status.release_scoped_lock("telegram-bot-token", "secret") status.release_scoped_lock("telegram-bot-token", "secret")
assert not lock_path.exists() assert not lock_path.exists()
def test_release_all_scoped_locks_can_target_single_owner(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
lock_dir = tmp_path / "locks"
lock_dir.mkdir(parents=True, exist_ok=True)
target_lock = lock_dir / "telegram-bot-token-target.lock"
other_lock = lock_dir / "slack-app-token-other.lock"
target_lock.write_text(json.dumps({
"pid": 111,
"start_time": 222,
"kind": "hermes-gateway",
}))
other_lock.write_text(json.dumps({
"pid": 999,
"start_time": 333,
"kind": "hermes-gateway",
}))
removed = status.release_all_scoped_locks(
owner_pid=111,
owner_start_time=222,
)
assert removed == 1
assert not target_lock.exists()
assert other_lock.exists()
def test_release_all_scoped_locks_skips_pid_reuse_mismatch(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
lock_dir = tmp_path / "locks"
lock_dir.mkdir(parents=True, exist_ok=True)
reused_pid_lock = lock_dir / "telegram-bot-token-reused.lock"
reused_pid_lock.write_text(json.dumps({
"pid": 111,
"start_time": 999,
"kind": "hermes-gateway",
}))
removed = status.release_all_scoped_locks(
owner_pid=111,
owner_start_time=222,
)
assert removed == 0
assert reused_pid_lock.exists()
class TestTakeoverMarker: class TestTakeoverMarker:
"""Tests for the --replace takeover marker. """Tests for the --replace takeover marker.