mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(gateway): implement platform-aware PID termination
This commit is contained in:
parent
9bb8cb8d83
commit
00dd5cc491
6 changed files with 138 additions and 25 deletions
|
|
@ -7582,7 +7582,7 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
|
|||
# setups (each profile using a distinct HERMES_HOME) will naturally
|
||||
# allow concurrent instances without tripping this guard.
|
||||
import time as _time
|
||||
from gateway.status import get_running_pid, remove_pid_file
|
||||
from gateway.status import get_running_pid, remove_pid_file, terminate_pid
|
||||
existing_pid = get_running_pid()
|
||||
if existing_pid is not None and existing_pid != os.getpid():
|
||||
if replace:
|
||||
|
|
@ -7591,10 +7591,10 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
|
|||
existing_pid,
|
||||
)
|
||||
try:
|
||||
os.kill(existing_pid, signal.SIGTERM)
|
||||
terminate_pid(existing_pid, force=False)
|
||||
except ProcessLookupError:
|
||||
pass # Already gone
|
||||
except PermissionError:
|
||||
except (PermissionError, OSError):
|
||||
logger.error(
|
||||
"Permission denied killing PID %d. Cannot replace.",
|
||||
existing_pid,
|
||||
|
|
@ -7614,9 +7614,9 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
|
|||
existing_pid,
|
||||
)
|
||||
try:
|
||||
os.kill(existing_pid, signal.SIGKILL)
|
||||
terminate_pid(existing_pid, force=True)
|
||||
_time.sleep(0.5)
|
||||
except (ProcessLookupError, PermissionError):
|
||||
except (ProcessLookupError, PermissionError, OSError):
|
||||
pass
|
||||
remove_pid_file()
|
||||
# Also release all scoped locks left by the old process.
|
||||
|
|
|
|||
|
|
@ -14,6 +14,8 @@ concurrently under distinct configurations).
|
|||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
|
@ -23,6 +25,7 @@ from typing import Any, Optional
|
|||
_GATEWAY_KIND = "hermes-gateway"
|
||||
_RUNTIME_STATUS_FILE = "gateway_state.json"
|
||||
_LOCKS_DIRNAME = "gateway-locks"
|
||||
_IS_WINDOWS = sys.platform == "win32"
|
||||
|
||||
|
||||
def _get_pid_path() -> Path:
|
||||
|
|
@ -49,6 +52,33 @@ def _utc_now_iso() -> str:
|
|||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
|
||||
def terminate_pid(pid: int, *, force: bool = False) -> None:
|
||||
"""Terminate a PID with platform-appropriate force semantics.
|
||||
|
||||
POSIX uses SIGTERM/SIGKILL. Windows uses taskkill /T /F for true force-kill
|
||||
because os.kill(..., SIGTERM) is not equivalent to a tree-killing hard stop.
|
||||
"""
|
||||
if force and _IS_WINDOWS:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["taskkill", "/PID", str(pid), "/T", "/F"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
)
|
||||
except FileNotFoundError:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
return
|
||||
|
||||
if result.returncode != 0:
|
||||
details = (result.stderr or result.stdout or "").strip()
|
||||
raise OSError(details or f"taskkill failed for PID {pid}")
|
||||
return
|
||||
|
||||
sig = signal.SIGTERM if not force else getattr(signal, "SIGKILL", signal.SIGTERM)
|
||||
os.kill(pid, sig)
|
||||
|
||||
|
||||
def _scope_hash(identity: str) -> str:
|
||||
return hashlib.sha256(identity.encode("utf-8")).hexdigest()[:16]
|
||||
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ from pathlib import Path
|
|||
|
||||
PROJECT_ROOT = Path(__file__).parent.parent.resolve()
|
||||
|
||||
from gateway.status import terminate_pid
|
||||
from hermes_cli.config import get_env_value, get_hermes_home, save_env_value, is_managed, managed_error
|
||||
# display_hermes_home is imported lazily at call sites to avoid ImportError
|
||||
# when hermes_constants is cached from a pre-update version during `hermes update`.
|
||||
|
|
@ -162,7 +163,7 @@ def kill_gateway_processes(force: bool = False, exclude_pids: set | None = None)
|
|||
"""Kill any running gateway processes. Returns count killed.
|
||||
|
||||
Args:
|
||||
force: Use SIGKILL instead of SIGTERM.
|
||||
force: Use the platform's force-kill mechanism instead of graceful terminate.
|
||||
exclude_pids: PIDs to skip (e.g. service-managed PIDs that were just
|
||||
restarted and should not be killed).
|
||||
"""
|
||||
|
|
@ -171,10 +172,7 @@ def kill_gateway_processes(force: bool = False, exclude_pids: set | None = None)
|
|||
|
||||
for pid in pids:
|
||||
try:
|
||||
if force and not is_windows():
|
||||
os.kill(pid, signal.SIGKILL)
|
||||
else:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
terminate_pid(pid, force=force)
|
||||
killed += 1
|
||||
except ProcessLookupError:
|
||||
# Process already gone
|
||||
|
|
@ -182,6 +180,8 @@ def kill_gateway_processes(force: bool = False, exclude_pids: set | None = None)
|
|||
except PermissionError:
|
||||
print(f"⚠ Permission denied to kill PID {pid}")
|
||||
|
||||
except OSError as exc:
|
||||
print(f"Failed to kill PID {pid}: {exc}")
|
||||
return killed
|
||||
|
||||
|
||||
|
|
@ -1220,7 +1220,7 @@ def _wait_for_gateway_exit(timeout: float = 10.0, force_after: float = 5.0):
|
|||
|
||||
Args:
|
||||
timeout: Total seconds to wait before giving up.
|
||||
force_after: Seconds of graceful waiting before sending SIGKILL.
|
||||
force_after: Seconds of graceful waiting before escalating to force-kill.
|
||||
"""
|
||||
import time
|
||||
from gateway.status import get_running_pid
|
||||
|
|
@ -1237,15 +1237,15 @@ def _wait_for_gateway_exit(timeout: float = 10.0, force_after: float = 5.0):
|
|||
if not force_sent and time.monotonic() >= force_deadline:
|
||||
# Grace period expired — force-kill the specific PID.
|
||||
try:
|
||||
os.kill(pid, signal.SIGKILL)
|
||||
terminate_pid(pid, force=True)
|
||||
print(f"⚠ Gateway PID {pid} did not exit gracefully; sent SIGKILL")
|
||||
except (ProcessLookupError, PermissionError):
|
||||
except (ProcessLookupError, PermissionError, OSError):
|
||||
return # Already gone or we can't touch it.
|
||||
force_sent = True
|
||||
|
||||
time.sleep(0.3)
|
||||
|
||||
# Timed out even after SIGKILL.
|
||||
# Timed out even after force-kill.
|
||||
remaining_pid = get_running_pid()
|
||||
if remaining_pid is not None:
|
||||
print(f"⚠ Gateway PID {remaining_pid} still running after {timeout}s — restart may fail")
|
||||
|
|
|
|||
|
|
@ -87,3 +87,42 @@ async def test_runner_allows_cron_only_mode_when_no_platforms_are_enabled(monkey
|
|||
assert runner.adapters == {}
|
||||
state = read_runtime_status()
|
||||
assert state["gateway_state"] == "running"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_gateway_replace_force_uses_terminate_pid(monkeypatch, tmp_path):
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
|
||||
calls = []
|
||||
|
||||
class _CleanExitRunner:
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
self.should_exit_cleanly = True
|
||||
self.exit_reason = None
|
||||
self.adapters = {}
|
||||
|
||||
async def start(self):
|
||||
return True
|
||||
|
||||
async def stop(self):
|
||||
return None
|
||||
|
||||
monkeypatch.setattr("gateway.status.get_running_pid", lambda: 42)
|
||||
monkeypatch.setattr("gateway.status.remove_pid_file", lambda: None)
|
||||
monkeypatch.setattr("gateway.status.release_all_scoped_locks", lambda: 0)
|
||||
monkeypatch.setattr("gateway.status.terminate_pid", lambda pid, force=False: calls.append((pid, force)))
|
||||
monkeypatch.setattr("gateway.run.os.getpid", lambda: 100)
|
||||
monkeypatch.setattr("gateway.run.os.kill", lambda pid, sig: None)
|
||||
monkeypatch.setattr("time.sleep", lambda _: None)
|
||||
monkeypatch.setattr("tools.skills_sync.sync_skills", lambda quiet=True: None)
|
||||
monkeypatch.setattr("hermes_logging.setup_logging", lambda hermes_home, mode: tmp_path)
|
||||
monkeypatch.setattr("hermes_logging._add_rotating_handler", lambda *args, **kwargs: None)
|
||||
monkeypatch.setattr("gateway.run.GatewayRunner", _CleanExitRunner)
|
||||
|
||||
from gateway.run import start_gateway
|
||||
|
||||
ok = await start_gateway(config=GatewayConfig(), replace=True, verbosity=None)
|
||||
|
||||
assert ok is True
|
||||
assert calls == [(42, False), (42, True)]
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
import json
|
||||
import os
|
||||
from types import SimpleNamespace
|
||||
|
||||
from gateway import status
|
||||
|
||||
|
|
@ -104,6 +105,41 @@ class TestGatewayRuntimeStatus:
|
|||
assert payload["platforms"]["telegram"]["error_message"] == "another poller is active"
|
||||
|
||||
|
||||
class TestTerminatePid:
|
||||
def test_force_uses_taskkill_on_windows(self, monkeypatch):
|
||||
calls = []
|
||||
monkeypatch.setattr(status, "_IS_WINDOWS", True)
|
||||
|
||||
def fake_run(cmd, capture_output=False, text=False, timeout=None):
|
||||
calls.append((cmd, capture_output, text, timeout))
|
||||
return SimpleNamespace(returncode=0, stdout="", stderr="")
|
||||
|
||||
monkeypatch.setattr(status.subprocess, "run", fake_run)
|
||||
|
||||
status.terminate_pid(123, force=True)
|
||||
|
||||
assert calls == [
|
||||
(["taskkill", "/PID", "123", "/T", "/F"], True, True, 10)
|
||||
]
|
||||
|
||||
def test_force_falls_back_to_sigterm_when_taskkill_missing(self, monkeypatch):
|
||||
calls = []
|
||||
monkeypatch.setattr(status, "_IS_WINDOWS", True)
|
||||
|
||||
def fake_run(*args, **kwargs):
|
||||
raise FileNotFoundError
|
||||
|
||||
def fake_kill(pid, sig):
|
||||
calls.append((pid, sig))
|
||||
|
||||
monkeypatch.setattr(status.subprocess, "run", fake_run)
|
||||
monkeypatch.setattr(status.os, "kill", fake_kill)
|
||||
|
||||
status.terminate_pid(456, force=True)
|
||||
|
||||
assert calls == [(456, status.signal.SIGTERM)]
|
||||
|
||||
|
||||
class TestScopedLocks:
|
||||
def test_acquire_scoped_lock_rejects_live_other_process(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
"""Tests for hermes_cli.gateway."""
|
||||
|
||||
import signal
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import patch, call
|
||||
|
||||
|
|
@ -211,8 +210,7 @@ class TestWaitForGatewayExit:
|
|||
assert poll_count == 3
|
||||
|
||||
def test_force_kills_after_grace_period(self, monkeypatch):
|
||||
"""When the process doesn't exit, SIGKILL the saved PID."""
|
||||
import time as _time
|
||||
"""When the process doesn't exit, force-kill the saved PID."""
|
||||
|
||||
# Simulate monotonic time advancing past force_after
|
||||
call_num = 0
|
||||
|
|
@ -224,8 +222,8 @@ class TestWaitForGatewayExit:
|
|||
return call_num * 2.0 # 2, 4, 6, 8, ...
|
||||
|
||||
kills = []
|
||||
def mock_kill(pid, sig):
|
||||
kills.append((pid, sig))
|
||||
def mock_terminate(pid, force=False):
|
||||
kills.append((pid, force))
|
||||
|
||||
# get_running_pid returns the PID until kill is sent, then None
|
||||
def mock_get_running_pid():
|
||||
|
|
@ -234,14 +232,13 @@ class TestWaitForGatewayExit:
|
|||
monkeypatch.setattr("time.monotonic", fake_monotonic)
|
||||
monkeypatch.setattr("time.sleep", lambda _: None)
|
||||
monkeypatch.setattr("gateway.status.get_running_pid", mock_get_running_pid)
|
||||
monkeypatch.setattr("os.kill", mock_kill)
|
||||
monkeypatch.setattr(gateway, "terminate_pid", mock_terminate)
|
||||
|
||||
gateway._wait_for_gateway_exit(timeout=10.0, force_after=5.0)
|
||||
assert (42, signal.SIGKILL) in kills
|
||||
assert (42, True) in kills
|
||||
|
||||
def test_handles_process_already_gone_on_kill(self, monkeypatch):
|
||||
"""ProcessLookupError during SIGKILL is not fatal."""
|
||||
import time as _time
|
||||
"""ProcessLookupError during force-kill is not fatal."""
|
||||
|
||||
call_num = 0
|
||||
def fake_monotonic():
|
||||
|
|
@ -249,13 +246,24 @@ class TestWaitForGatewayExit:
|
|||
call_num += 1
|
||||
return call_num * 3.0 # Jump past force_after quickly
|
||||
|
||||
def mock_kill(pid, sig):
|
||||
def mock_terminate(pid, force=False):
|
||||
raise ProcessLookupError
|
||||
|
||||
monkeypatch.setattr("time.monotonic", fake_monotonic)
|
||||
monkeypatch.setattr("time.sleep", lambda _: None)
|
||||
monkeypatch.setattr("gateway.status.get_running_pid", lambda: 99)
|
||||
monkeypatch.setattr("os.kill", mock_kill)
|
||||
monkeypatch.setattr(gateway, "terminate_pid", mock_terminate)
|
||||
|
||||
# Should not raise — ProcessLookupError means it's already gone.
|
||||
gateway._wait_for_gateway_exit(timeout=10.0, force_after=2.0)
|
||||
|
||||
def test_kill_gateway_processes_force_uses_helper(self, monkeypatch):
|
||||
calls = []
|
||||
|
||||
monkeypatch.setattr(gateway, "find_gateway_pids", lambda exclude_pids=None: [11, 22])
|
||||
monkeypatch.setattr(gateway, "terminate_pid", lambda pid, force=False: calls.append((pid, force)))
|
||||
|
||||
killed = gateway.kill_gateway_processes(force=True)
|
||||
|
||||
assert killed == 2
|
||||
assert calls == [(11, True), (22, True)]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue