fix: unify OpenClaw detection, add isatty guard, fix print_warning import

Combines detection from both PRs into _detect_openclaw_processes():
- Cross-platform process scan (pgrep/tasklist/PowerShell) from PR #8102
- systemd service check from PR #8555
- Returns list[str] with details about what's found

Fixes in cleanup warning (from PR #8555):
- print_warning -> print_error/print_info (print_warning not in import chain)
- Added isatty() guard for non-interactive sessions
- Removed duplicate _check_openclaw_running() in favor of shared function

Updated all tests to match new API.
This commit is contained in:
Teknium 2026-04-12 16:40:10 -07:00 committed by Teknium
parent 76f7411fca
commit c83674dd77
2 changed files with 108 additions and 91 deletions

View file

@ -53,21 +53,39 @@ _OPENCLAW_SCRIPT_INSTALLED = (
# Known OpenClaw directory names (current + legacy) # Known OpenClaw directory names (current + legacy)
_OPENCLAW_DIR_NAMES = (".openclaw", ".clawdbot", ".moltbot") _OPENCLAW_DIR_NAMES = (".openclaw", ".clawdbot", ".moltbot")
def _is_openclaw_running() -> bool: def _detect_openclaw_processes() -> list[str]:
"""Check whether an OpenClaw process appears to be running.""" """Detect running OpenClaw processes and services.
Returns a list of human-readable descriptions of what was found.
An empty list means nothing was detected.
"""
found: list[str] = []
# -- systemd service (Linux) ------------------------------------------
if sys.platform != "win32":
try:
result = subprocess.run(
["systemctl", "--user", "is-active", "openclaw-gateway.service"],
capture_output=True, text=True, timeout=5,
)
if result.stdout.strip() == "active":
found.append("systemd service: openclaw-gateway.service")
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# -- process scan ------------------------------------------------------
if sys.platform == "win32": if sys.platform == "win32":
try: try:
# First check for dedicated executables
for exe in ("openclaw.exe", "clawd.exe"): for exe in ("openclaw.exe", "clawd.exe"):
result = subprocess.run( result = subprocess.run(
["tasklist", "/FI", f"IMAGENAME eq {exe}"], ["tasklist", "/FI", f"IMAGENAME eq {exe}"],
capture_output=True, text=True, timeout=5 capture_output=True, text=True, timeout=5,
) )
if exe in result.stdout.lower(): if exe in result.stdout.lower():
return True found.append(f"process: {exe}")
# Check node.exe processes for openclaw/clawd in command line. # Node.js-hosted OpenClaw — tasklist doesn't show command lines,
# tasklist does not include command lines, so we use PowerShell. # so fall back to PowerShell.
ps_cmd = ( ps_cmd = (
'Get-CimInstance Win32_Process -Filter "Name = \'node.exe\'" | ' 'Get-CimInstance Win32_Process -Filter "Name = \'node.exe\'" | '
'Where-Object { $_.CommandLine -match "openclaw|clawd" } | ' 'Where-Object { $_.CommandLine -match "openclaw|clawd" } | '
@ -75,20 +93,25 @@ def _is_openclaw_running() -> bool:
) )
result = subprocess.run( result = subprocess.run(
["powershell", "-NoProfile", "-Command", ps_cmd], ["powershell", "-NoProfile", "-Command", ps_cmd],
capture_output=True, text=True, timeout=5 capture_output=True, text=True, timeout=5,
) )
return bool(result.stdout.strip()) if result.stdout.strip():
found.append(f"node.exe process with openclaw in command line (PID {result.stdout.strip()})")
except Exception: except Exception:
return False pass
else:
for cmd in (["pgrep", "-f", "openclaw"], ["pgrep", "-f", "clawd"]):
try: try:
result = subprocess.run(cmd, capture_output=True, timeout=3) result = subprocess.run(
["pgrep", "-f", "openclaw"],
capture_output=True, text=True, timeout=3,
)
if result.returncode == 0: if result.returncode == 0:
return True pids = result.stdout.strip().split()
found.append(f"openclaw process(es) (PIDs: {', '.join(pids)})")
except (FileNotFoundError, subprocess.TimeoutExpired): except (FileNotFoundError, subprocess.TimeoutExpired):
continue pass
return False
return found
def _warn_if_openclaw_running(auto_yes: bool) -> None: def _warn_if_openclaw_running(auto_yes: bool) -> None:
@ -98,11 +121,14 @@ def _warn_if_openclaw_running(auto_yes: bool) -> None:
token. Migrating while OpenClaw is running causes both to fight for the token. Migrating while OpenClaw is running causes both to fight for the
same token. same token.
""" """
if not _is_openclaw_running(): running = _detect_openclaw_processes()
if not running:
return return
print() print()
print_error("OpenClaw appears to be running.") print_error("OpenClaw appears to be running:")
for detail in running:
print_info(f" * {detail}")
print_info( print_info(
"Messaging platforms (Telegram, Discord, Slack) only allow one " "Messaging platforms (Telegram, Discord, Slack) only allow one "
"active session per bot token. If you continue, both OpenClaw and " "active session per bot token. If you continue, both OpenClaw and "
@ -229,34 +255,6 @@ def _scan_workspace_state(source_dir: Path) -> list[tuple[Path, str]]:
return findings return findings
def _check_openclaw_running() -> list:
"""Check if any OpenClaw processes or services are still running."""
import subprocess
running = []
# Check systemd service
try:
result = subprocess.run(
["systemctl", "--user", "is-active", "openclaw-gateway.service"],
capture_output=True, text=True, timeout=5
)
if result.stdout.strip() == "active":
running.append("systemd service: openclaw-gateway.service")
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# Check running processes
try:
result = subprocess.run(
["pgrep", "-f", "openclaw"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
pids = result.stdout.strip().split()
running.append(f"openclaw process(es) running (PIDs: {', '.join(pids)})")
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
return running
def _archive_directory(source_dir: Path, dry_run: bool = False) -> Path: def _archive_directory(source_dir: Path, dry_run: bool = False) -> Path:
"""Rename an OpenClaw directory to .pre-migration. """Rename an OpenClaw directory to .pre-migration.
@ -528,18 +526,25 @@ def _cmd_cleanup(args):
print() print()
print_success("No OpenClaw directories found. Nothing to clean up.") print_success("No OpenClaw directories found. Nothing to clean up.")
return return
# Warn if OpenClaw is still running
running = _check_openclaw_running() # Warn if OpenClaw is still running — archiving while the service is
# active causes it to recreate an empty skeleton directory (#8502).
running = _detect_openclaw_processes()
if running: if running:
print() print()
print_warning("OpenClaw appears to be still running:") print_error("OpenClaw appears to be still running:")
for proc in running: for detail in running:
print_warning(f"{proc}") print_info(f" * {detail}")
print_warning("Archiving .openclaw/ while the service is active may cause it to") print_info(
print_warning("immediately recreate an empty skeleton directory, destroying your config.") "Archiving .openclaw/ while the service is active may cause it to "
print_warning("Stop OpenClaw first: systemctl --user stop openclaw-gateway.service") "immediately recreate an empty skeleton directory, destroying your config."
)
print_info("Stop OpenClaw first: systemctl --user stop openclaw-gateway.service")
print() print()
if not auto_yes: if not auto_yes:
if not sys.stdin.isatty():
print_info("Non-interactive session — aborting. Stop OpenClaw and re-run.")
return
if not prompt_yes_no("Proceed anyway?", default=False): if not prompt_yes_no("Proceed anyway?", default=False):
print_info("Aborted. Stop OpenClaw first, then re-run: hermes claw cleanup") print_info("Aborted. Stop OpenClaw first, then re-run: hermes claw cleanup")
return return

View file

@ -1,6 +1,7 @@
"""Tests for hermes claw commands.""" """Tests for hermes claw commands."""
from argparse import Namespace from argparse import Namespace
import subprocess
from types import ModuleType from types import ModuleType
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
@ -199,7 +200,7 @@ class TestCmdMigrate:
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def _mock_openclaw_running(self): def _mock_openclaw_running(self):
with patch.object(claw_mod, "_is_openclaw_running", return_value=False): with patch.object(claw_mod, "_detect_openclaw_processes", return_value=[]):
yield yield
def test_error_when_source_missing(self, tmp_path, capsys): def test_error_when_source_missing(self, tmp_path, capsys):
@ -633,81 +634,92 @@ class TestPrintMigrationReport:
assert "Nothing to migrate" in captured.out assert "Nothing to migrate" in captured.out
class TestIsOpenclawRunning: class TestDetectOpenclawProcesses:
def test_returns_true_when_pgrep_finds_openclaw(self): def test_returns_match_when_pgrep_finds_openclaw(self):
with patch.object(claw_mod, "sys") as mock_sys:
mock_sys.platform = "linux"
with patch.object(claw_mod, "subprocess") as mock_subprocess:
# systemd check misses, pgrep finds openclaw
mock_subprocess.run.side_effect = [
MagicMock(returncode=1, stdout=""), # systemctl
MagicMock(returncode=0, stdout="1234\n"), # pgrep
]
mock_subprocess.TimeoutExpired = subprocess.TimeoutExpired
result = claw_mod._detect_openclaw_processes()
assert len(result) == 1
assert "1234" in result[0]
def test_returns_empty_when_pgrep_finds_nothing(self):
with patch.object(claw_mod, "sys") as mock_sys: with patch.object(claw_mod, "sys") as mock_sys:
mock_sys.platform = "darwin" mock_sys.platform = "darwin"
with patch.object(claw_mod, "subprocess") as mock_subprocess: with patch.object(claw_mod, "subprocess") as mock_subprocess:
mock_subprocess.run.side_effect = [ mock_subprocess.run.side_effect = [
MagicMock(returncode=0), MagicMock(returncode=1, stdout=""), # systemctl (not found)
MagicMock(returncode=1, stdout=""), # pgrep
] ]
assert claw_mod._is_openclaw_running() is True mock_subprocess.TimeoutExpired = subprocess.TimeoutExpired
result = claw_mod._detect_openclaw_processes()
assert result == []
def test_returns_true_when_pgrep_finds_clawd(self): def test_detects_systemd_service(self):
with patch.object(claw_mod, "sys") as mock_sys: with patch.object(claw_mod, "sys") as mock_sys:
mock_sys.platform = "linux" mock_sys.platform = "linux"
with patch.object(claw_mod, "subprocess") as mock_subprocess: with patch.object(claw_mod, "subprocess") as mock_subprocess:
mock_subprocess.run.side_effect = [ mock_subprocess.run.side_effect = [
MagicMock(returncode=1), MagicMock(returncode=0, stdout="active\n"), # systemctl
MagicMock(returncode=0), MagicMock(returncode=1, stdout=""), # pgrep
] ]
assert claw_mod._is_openclaw_running() is True mock_subprocess.TimeoutExpired = subprocess.TimeoutExpired
result = claw_mod._detect_openclaw_processes()
assert len(result) == 1
assert "systemd" in result[0]
def test_returns_false_when_pgrep_finds_nothing(self): def test_returns_match_on_windows_when_openclaw_exe_running(self):
with patch.object(claw_mod, "sys") as mock_sys:
mock_sys.platform = "darwin"
with patch.object(claw_mod, "subprocess") as mock_subprocess:
mock_subprocess.run.side_effect = [
MagicMock(returncode=1),
MagicMock(returncode=1),
]
assert claw_mod._is_openclaw_running() is False
def test_returns_true_on_windows_when_openclaw_exe_running(self):
with patch.object(claw_mod, "sys") as mock_sys: with patch.object(claw_mod, "sys") as mock_sys:
mock_sys.platform = "win32" mock_sys.platform = "win32"
with patch.object(claw_mod, "subprocess") as mock_subprocess: with patch.object(claw_mod, "subprocess") as mock_subprocess:
# First tasklist (openclaw.exe) matches
mock_subprocess.run.side_effect = [ mock_subprocess.run.side_effect = [
MagicMock(returncode=0, stdout="openclaw.exe 1234 Console 1 45,056 K\n"), MagicMock(returncode=0, stdout="openclaw.exe 1234 Console 1 45,056 K\n"),
] ]
assert claw_mod._is_openclaw_running() is True result = claw_mod._detect_openclaw_processes()
assert len(result) >= 1
assert any("openclaw.exe" in r for r in result)
def test_returns_true_on_windows_when_node_exe_has_openclaw_in_cmdline(self): def test_returns_match_on_windows_when_node_exe_has_openclaw_in_cmdline(self):
with patch.object(claw_mod, "sys") as mock_sys: with patch.object(claw_mod, "sys") as mock_sys:
mock_sys.platform = "win32" mock_sys.platform = "win32"
with patch.object(claw_mod, "subprocess") as mock_subprocess: with patch.object(claw_mod, "subprocess") as mock_subprocess:
# tasklist for openclaw.exe and clawd.exe both miss,
# PowerShell finds a matching node.exe process.
mock_subprocess.run.side_effect = [ mock_subprocess.run.side_effect = [
MagicMock(returncode=0, stdout=""), MagicMock(returncode=0, stdout=""), # tasklist openclaw.exe
MagicMock(returncode=0, stdout=""), MagicMock(returncode=0, stdout=""), # tasklist clawd.exe
MagicMock(returncode=0, stdout="1234\n"), MagicMock(returncode=0, stdout="1234\n"), # PowerShell
] ]
assert claw_mod._is_openclaw_running() is True result = claw_mod._detect_openclaw_processes()
assert len(result) >= 1
assert any("node.exe" in r for r in result)
def test_returns_false_on_windows_when_node_exe_has_no_openclaw_in_cmdline(self): def test_returns_empty_on_windows_when_nothing_found(self):
with patch.object(claw_mod, "sys") as mock_sys: with patch.object(claw_mod, "sys") as mock_sys:
mock_sys.platform = "win32" mock_sys.platform = "win32"
with patch.object(claw_mod, "subprocess") as mock_subprocess: with patch.object(claw_mod, "subprocess") as mock_subprocess:
# Neither dedicated exe nor PowerShell find anything.
mock_subprocess.run.side_effect = [ mock_subprocess.run.side_effect = [
MagicMock(returncode=0, stdout=""), MagicMock(returncode=0, stdout=""),
MagicMock(returncode=0, stdout=""), MagicMock(returncode=0, stdout=""),
MagicMock(returncode=0, stdout=""), MagicMock(returncode=0, stdout=""),
] ]
assert claw_mod._is_openclaw_running() is False result = claw_mod._detect_openclaw_processes()
assert result == []
class TestWarnIfOpenclawRunning: class TestWarnIfOpenclawRunning:
def test_noop_when_not_running(self, capsys): def test_noop_when_not_running(self, capsys):
with patch.object(claw_mod, "_is_openclaw_running", return_value=False): with patch.object(claw_mod, "_detect_openclaw_processes", return_value=[]):
claw_mod._warn_if_openclaw_running(auto_yes=False) claw_mod._warn_if_openclaw_running(auto_yes=False)
captured = capsys.readouterr() captured = capsys.readouterr()
assert captured.out == "" assert captured.out == ""
def test_warns_and_exits_when_running_and_user_declines(self, capsys): def test_warns_and_exits_when_running_and_user_declines(self, capsys):
with patch.object(claw_mod, "_is_openclaw_running", return_value=True): with patch.object(claw_mod, "_detect_openclaw_processes", return_value=["openclaw process(es) (PIDs: 1234)"]):
with patch.object(claw_mod, "prompt_yes_no", return_value=False): with patch.object(claw_mod, "prompt_yes_no", return_value=False):
with patch.object(claw_mod.sys.stdin, "isatty", return_value=True): with patch.object(claw_mod.sys.stdin, "isatty", return_value=True):
with pytest.raises(SystemExit) as exc_info: with pytest.raises(SystemExit) as exc_info:
@ -717,7 +729,7 @@ class TestWarnIfOpenclawRunning:
assert "OpenClaw appears to be running" in captured.out assert "OpenClaw appears to be running" in captured.out
def test_warns_and_continues_when_running_and_user_accepts(self, capsys): def test_warns_and_continues_when_running_and_user_accepts(self, capsys):
with patch.object(claw_mod, "_is_openclaw_running", return_value=True): with patch.object(claw_mod, "_detect_openclaw_processes", return_value=["openclaw process(es) (PIDs: 1234)"]):
with patch.object(claw_mod, "prompt_yes_no", return_value=True): with patch.object(claw_mod, "prompt_yes_no", return_value=True):
with patch.object(claw_mod.sys.stdin, "isatty", return_value=True): with patch.object(claw_mod.sys.stdin, "isatty", return_value=True):
claw_mod._warn_if_openclaw_running(auto_yes=False) claw_mod._warn_if_openclaw_running(auto_yes=False)
@ -725,13 +737,13 @@ class TestWarnIfOpenclawRunning:
assert "OpenClaw appears to be running" in captured.out assert "OpenClaw appears to be running" in captured.out
def test_warns_and_continues_in_auto_yes_mode(self, capsys): def test_warns_and_continues_in_auto_yes_mode(self, capsys):
with patch.object(claw_mod, "_is_openclaw_running", return_value=True): with patch.object(claw_mod, "_detect_openclaw_processes", return_value=["openclaw process(es) (PIDs: 1234)"]):
claw_mod._warn_if_openclaw_running(auto_yes=True) claw_mod._warn_if_openclaw_running(auto_yes=True)
captured = capsys.readouterr() captured = capsys.readouterr()
assert "OpenClaw appears to be running" in captured.out assert "OpenClaw appears to be running" in captured.out
def test_warns_and_continues_in_non_interactive_session(self, capsys): def test_warns_and_continues_in_non_interactive_session(self, capsys):
with patch.object(claw_mod, "_is_openclaw_running", return_value=True): with patch.object(claw_mod, "_detect_openclaw_processes", return_value=["openclaw process(es) (PIDs: 1234)"]):
with patch.object(claw_mod.sys.stdin, "isatty", return_value=False): with patch.object(claw_mod.sys.stdin, "isatty", return_value=False):
claw_mod._warn_if_openclaw_running(auto_yes=False) claw_mod._warn_if_openclaw_running(auto_yes=False)
captured = capsys.readouterr() captured = capsys.readouterr()