mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix: two process leaks (agent-browser daemons, paste.rs sleepers) (#11843)
Both fixes close process leaks observed in production (18+ orphaned
agent-browser node daemons, 15+ orphaned paste.rs sleep interpreters
accumulated over ~3 days, ~2.7 GB RSS).
## agent-browser daemon leak
Previously the orphan reaper (_reap_orphaned_browser_sessions) only ran
from _start_browser_cleanup_thread, which is only invoked on the first
browser tool call in a process. Hermes sessions that never used the
browser never swept orphans, and the cross-process orphan detection
relied on in-process _active_sessions, which doesn't see other hermes
PIDs' sessions (race risk).
- Write <session>.owner_pid alongside the socket dir recording the
hermes PID that owns the daemon (extracted into _write_owner_pid for
direct testability).
- Reaper prefers owner_pid liveness over in-process _active_sessions.
Cross-process safe: concurrent hermes instances won't reap each
other's daemons. Legacy tracked_names fallback kept for daemons
that predate owner_pid.
- atexit handler (_emergency_cleanup_all_sessions) now always runs
the reaper, not just when this process had active sessions —
every clean hermes exit sweeps accumulated orphans.
## paste.rs auto-delete leak
_schedule_auto_delete spawned a detached Python subprocess per call
that slept 6 hours then issued DELETE requests. No dedup, no tracking —
every 'hermes debug share' invocation added ~20 MB of resident Python
interpreters that stuck around until the sleep finished.
- Replaced the spawn with ~/.hermes/pastes/pending.json: records
{url, expire_at} entries.
- _sweep_expired_pastes() synchronously DELETEs past-due entries on
every 'hermes debug' invocation (run_debug() dispatcher).
- Network failures stay in pending.json for up to 24h, then give up
(paste.rs's own retention handles the 'user never runs hermes again'
edge case).
- Zero subprocesses; regression test asserts subprocess/Popen/time.sleep
never appear in the function source (skipping docstrings via AST).
## Validation
| | Before | After |
|------------------------------|---------------|--------------|
| Orphan agent-browser daemons | 18 accumulated| 2 (live) |
| paste.rs sleep interpreters | 15 accumulated| 0 |
| RSS reclaimed | - | ~2.7 GB |
| Targeted tests | - | 2253 pass |
E2E verified: alive-owner daemons NOT reaped; dead-owner daemons
SIGTERM'd and socket dirs cleaned; pending.json sweep deletes expired
entries without spawning subprocesses.
This commit is contained in:
parent
64b354719f
commit
304fb921bf
4 changed files with 736 additions and 80 deletions
|
|
@ -6,7 +6,10 @@ Currently supports:
|
|||
"""
|
||||
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
|
|
@ -31,6 +34,119 @@ _MAX_LOG_BYTES = 512_000
|
|||
_AUTO_DELETE_SECONDS = 21600
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pending-deletion tracking (replaces the old fork-and-sleep subprocess).
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _pending_file() -> Path:
|
||||
"""Path to ``~/.hermes/pastes/pending.json``.
|
||||
|
||||
Each entry: ``{"url": "...", "expire_at": <unix_ts>}``. Scheduled
|
||||
DELETEs used to be handled by spawning a detached Python process per
|
||||
paste that slept for 6 hours; those accumulated forever if the user
|
||||
ran ``hermes debug share`` repeatedly. We now persist the schedule
|
||||
to disk and sweep expired entries on the next debug invocation.
|
||||
"""
|
||||
return get_hermes_home() / "pastes" / "pending.json"
|
||||
|
||||
|
||||
def _load_pending() -> list[dict]:
|
||||
path = _pending_file()
|
||||
if not path.exists():
|
||||
return []
|
||||
try:
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
if isinstance(data, list):
|
||||
# Filter to well-formed entries only
|
||||
return [
|
||||
e for e in data
|
||||
if isinstance(e, dict) and "url" in e and "expire_at" in e
|
||||
]
|
||||
except (OSError, ValueError, json.JSONDecodeError):
|
||||
pass
|
||||
return []
|
||||
|
||||
|
||||
def _save_pending(entries: list[dict]) -> None:
|
||||
path = _pending_file()
|
||||
try:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp = path.with_suffix(".json.tmp")
|
||||
tmp.write_text(json.dumps(entries, indent=2), encoding="utf-8")
|
||||
os.replace(tmp, path)
|
||||
except OSError:
|
||||
# Non-fatal — worst case the user has to run ``hermes debug delete``
|
||||
# manually.
|
||||
pass
|
||||
|
||||
|
||||
def _record_pending(urls: list[str], delay_seconds: int = _AUTO_DELETE_SECONDS) -> None:
|
||||
"""Record *urls* for deletion at ``now + delay_seconds``.
|
||||
|
||||
Only paste.rs URLs are recorded (dpaste.com auto-expires). Entries
|
||||
are merged into any existing pending.json.
|
||||
"""
|
||||
paste_rs_urls = [u for u in urls if _extract_paste_id(u)]
|
||||
if not paste_rs_urls:
|
||||
return
|
||||
|
||||
entries = _load_pending()
|
||||
# Dedupe by URL: keep the later expire_at if same URL appears twice
|
||||
by_url: dict[str, float] = {e["url"]: float(e["expire_at"]) for e in entries}
|
||||
expire_at = time.time() + delay_seconds
|
||||
for u in paste_rs_urls:
|
||||
by_url[u] = max(expire_at, by_url.get(u, 0.0))
|
||||
merged = [{"url": u, "expire_at": ts} for u, ts in by_url.items()]
|
||||
_save_pending(merged)
|
||||
|
||||
|
||||
def _sweep_expired_pastes(now: Optional[float] = None) -> tuple[int, int]:
|
||||
"""Synchronously DELETE any pending pastes whose ``expire_at`` has passed.
|
||||
|
||||
Returns ``(deleted, remaining)``. Best-effort: failed deletes stay in
|
||||
the pending file and will be retried on the next sweep. Silent —
|
||||
intended to be called from every ``hermes debug`` invocation with
|
||||
minimal noise.
|
||||
"""
|
||||
entries = _load_pending()
|
||||
if not entries:
|
||||
return (0, 0)
|
||||
|
||||
current = time.time() if now is None else now
|
||||
deleted = 0
|
||||
remaining: list[dict] = []
|
||||
|
||||
for entry in entries:
|
||||
try:
|
||||
expire_at = float(entry.get("expire_at", 0))
|
||||
except (TypeError, ValueError):
|
||||
continue # drop malformed entries
|
||||
if expire_at > current:
|
||||
remaining.append(entry)
|
||||
continue
|
||||
|
||||
url = entry.get("url", "")
|
||||
try:
|
||||
if delete_paste(url):
|
||||
deleted += 1
|
||||
continue
|
||||
except Exception:
|
||||
# Network hiccup, 404 (already gone), etc. — drop the entry
|
||||
# after a grace period; don't retry forever.
|
||||
pass
|
||||
|
||||
# Retain failed deletes for up to 24h past expiration, then give up.
|
||||
if expire_at + 86400 > current:
|
||||
remaining.append(entry)
|
||||
else:
|
||||
deleted += 1 # count as reaped (paste.rs will GC eventually)
|
||||
|
||||
if deleted:
|
||||
_save_pending(remaining)
|
||||
|
||||
return (deleted, len(remaining))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Privacy / delete helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
@ -90,37 +206,19 @@ def delete_paste(url: str) -> bool:
|
|||
|
||||
|
||||
def _schedule_auto_delete(urls: list[str], delay_seconds: int = _AUTO_DELETE_SECONDS):
|
||||
"""Spawn a detached process to delete paste.rs pastes after *delay_seconds*.
|
||||
"""Record *urls* for deletion ``delay_seconds`` from now.
|
||||
|
||||
The child process is fully detached (``start_new_session=True``) so it
|
||||
survives the parent exiting (important for CLI mode). Only paste.rs
|
||||
URLs are attempted — dpaste.com pastes auto-expire on their own.
|
||||
Previously this spawned a detached Python subprocess per call that slept
|
||||
for 6 hours and then issued DELETE requests. Those subprocesses leaked —
|
||||
every ``hermes debug share`` invocation added ~20 MB of resident Python
|
||||
interpreters that never exited until the sleep completed.
|
||||
|
||||
The replacement is stateless: we append to ``~/.hermes/pastes/pending.json``
|
||||
and rely on opportunistic sweeps (``_sweep_expired_pastes``) called from
|
||||
every ``hermes debug`` invocation. If the user never runs ``hermes debug``
|
||||
again, paste.rs's own retention policy handles cleanup.
|
||||
"""
|
||||
import subprocess
|
||||
|
||||
paste_rs_urls = [u for u in urls if _extract_paste_id(u)]
|
||||
if not paste_rs_urls:
|
||||
return
|
||||
|
||||
# Build a tiny inline Python script. No imports beyond stdlib.
|
||||
url_list = ", ".join(f'"{u}"' for u in paste_rs_urls)
|
||||
script = (
|
||||
"import time, urllib.request; "
|
||||
f"time.sleep({delay_seconds}); "
|
||||
f"[urllib.request.urlopen(urllib.request.Request(u, method='DELETE', "
|
||||
f"headers={{'User-Agent': 'hermes-agent/auto-delete'}}), timeout=15) "
|
||||
f"for u in [{url_list}]]"
|
||||
)
|
||||
|
||||
try:
|
||||
subprocess.Popen(
|
||||
[sys.executable, "-c", script],
|
||||
start_new_session=True,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
)
|
||||
except Exception:
|
||||
pass # Best-effort; manual delete still available.
|
||||
_record_pending(urls, delay_seconds=delay_seconds)
|
||||
|
||||
|
||||
def _delete_hint(url: str) -> str:
|
||||
|
|
@ -455,6 +553,16 @@ def run_debug_delete(args):
|
|||
|
||||
def run_debug(args):
|
||||
"""Route debug subcommands."""
|
||||
# Opportunistic sweep of expired pastes on every ``hermes debug`` call.
|
||||
# Replaces the old per-paste sleeping subprocess that used to leak as
|
||||
# one orphaned Python interpreter per scheduled deletion. Silent and
|
||||
# best-effort — any failure is swallowed so ``hermes debug`` stays
|
||||
# reliable even when offline.
|
||||
try:
|
||||
_sweep_expired_pastes()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
subcmd = getattr(args, "debug_command", None)
|
||||
if subcmd == "share":
|
||||
run_debug_share(args)
|
||||
|
|
|
|||
|
|
@ -501,40 +501,272 @@ class TestDeletePaste:
|
|||
|
||||
|
||||
class TestScheduleAutoDelete:
|
||||
def test_spawns_detached_process(self):
|
||||
"""``_schedule_auto_delete`` used to spawn a detached Python subprocess
|
||||
per call (one per paste URL batch). Those subprocesses slept 6 hours
|
||||
and accumulated forever under repeated use — 15+ orphaned interpreters
|
||||
were observed in production.
|
||||
|
||||
The new implementation is stateless: it records pending deletions to
|
||||
``~/.hermes/pastes/pending.json`` and lets ``_sweep_expired_pastes``
|
||||
handle the DELETE requests synchronously on the next ``hermes debug``
|
||||
invocation.
|
||||
"""
|
||||
|
||||
def test_does_not_spawn_subprocess(self, hermes_home):
|
||||
"""Regression guard: _schedule_auto_delete must NEVER spawn subprocesses.
|
||||
|
||||
We assert this structurally rather than by mocking Popen: the new
|
||||
implementation doesn't even import ``subprocess`` at module scope,
|
||||
so a mock patch wouldn't find it.
|
||||
"""
|
||||
import ast
|
||||
import inspect
|
||||
from hermes_cli.debug import _schedule_auto_delete
|
||||
|
||||
with patch("subprocess.Popen") as mock_popen:
|
||||
_schedule_auto_delete(
|
||||
["https://paste.rs/abc", "https://paste.rs/def"],
|
||||
delay_seconds=10,
|
||||
)
|
||||
# Strip the docstring before scanning so the regression-rationale
|
||||
# prose inside it doesn't trigger our banned-word checks.
|
||||
source = inspect.getsource(_schedule_auto_delete)
|
||||
tree = ast.parse(source)
|
||||
func_node = tree.body[0]
|
||||
if (
|
||||
func_node.body
|
||||
and isinstance(func_node.body[0], ast.Expr)
|
||||
and isinstance(func_node.body[0].value, ast.Constant)
|
||||
and isinstance(func_node.body[0].value.value, str)
|
||||
):
|
||||
func_node.body = func_node.body[1:]
|
||||
code_only = ast.unparse(func_node)
|
||||
|
||||
mock_popen.assert_called_once()
|
||||
call_args = mock_popen.call_args
|
||||
# Verify detached
|
||||
assert call_args[1]["start_new_session"] is True
|
||||
# Verify the script references both URLs
|
||||
script = call_args[0][0][2] # [python, -c, script]
|
||||
assert "paste.rs/abc" in script
|
||||
assert "paste.rs/def" in script
|
||||
assert "time.sleep(10)" in script
|
||||
assert "Popen" not in code_only, (
|
||||
"_schedule_auto_delete must not spawn subprocesses — "
|
||||
"use pending.json + _sweep_expired_pastes instead"
|
||||
)
|
||||
assert "subprocess" not in code_only, (
|
||||
"_schedule_auto_delete must not reference subprocess at all"
|
||||
)
|
||||
assert "time.sleep" not in code_only, (
|
||||
"Regression: sleeping in _schedule_auto_delete is the bug being fixed"
|
||||
)
|
||||
|
||||
def test_skips_non_paste_rs_urls(self):
|
||||
from hermes_cli.debug import _schedule_auto_delete
|
||||
# And verify that calling it doesn't produce any orphaned children
|
||||
# (it should just write pending.json synchronously).
|
||||
import os as _os
|
||||
before = set(_os.listdir("/proc")) if _os.path.exists("/proc") else None
|
||||
_schedule_auto_delete(
|
||||
["https://paste.rs/abc", "https://paste.rs/def"],
|
||||
delay_seconds=10,
|
||||
)
|
||||
if before is not None:
|
||||
after = set(_os.listdir("/proc"))
|
||||
new = after - before
|
||||
# Filter to only integer-named entries (process PIDs)
|
||||
new_pids = [p for p in new if p.isdigit()]
|
||||
# It's fine if unrelated processes appeared — we just need to make
|
||||
# sure we didn't spawn a long-sleeping one. The old bug spawned
|
||||
# a python interpreter whose cmdline contained "time.sleep".
|
||||
for pid in new_pids:
|
||||
try:
|
||||
with open(f"/proc/{pid}/cmdline", "rb") as f:
|
||||
cmdline = f.read().decode("utf-8", errors="replace")
|
||||
assert "time.sleep" not in cmdline, (
|
||||
f"Leaked sleeper subprocess PID {pid}: {cmdline}"
|
||||
)
|
||||
except OSError:
|
||||
pass # process exited already
|
||||
|
||||
with patch("subprocess.Popen") as mock_popen:
|
||||
_schedule_auto_delete(["https://dpaste.com/something"])
|
||||
def test_records_pending_to_json(self, hermes_home):
|
||||
"""Scheduled URLs are persisted to pending.json with expiration."""
|
||||
from hermes_cli.debug import _schedule_auto_delete, _pending_file
|
||||
import json
|
||||
|
||||
mock_popen.assert_not_called()
|
||||
_schedule_auto_delete(
|
||||
["https://paste.rs/abc", "https://paste.rs/def"],
|
||||
delay_seconds=10,
|
||||
)
|
||||
|
||||
def test_handles_popen_failure_gracefully(self):
|
||||
from hermes_cli.debug import _schedule_auto_delete
|
||||
pending_path = _pending_file()
|
||||
assert pending_path.exists()
|
||||
|
||||
with patch("subprocess.Popen",
|
||||
side_effect=OSError("no such file")):
|
||||
# Should not raise
|
||||
_schedule_auto_delete(["https://paste.rs/abc"])
|
||||
entries = json.loads(pending_path.read_text())
|
||||
assert len(entries) == 2
|
||||
urls = {e["url"] for e in entries}
|
||||
assert urls == {"https://paste.rs/abc", "https://paste.rs/def"}
|
||||
|
||||
# expire_at is ~now + delay_seconds
|
||||
import time
|
||||
for e in entries:
|
||||
assert e["expire_at"] > time.time()
|
||||
assert e["expire_at"] <= time.time() + 15
|
||||
|
||||
def test_skips_non_paste_rs_urls(self, hermes_home):
|
||||
"""dpaste.com URLs auto-expire — don't track them."""
|
||||
from hermes_cli.debug import _schedule_auto_delete, _pending_file
|
||||
|
||||
_schedule_auto_delete(["https://dpaste.com/something"])
|
||||
|
||||
# pending.json should not be created for non-paste.rs URLs
|
||||
assert not _pending_file().exists()
|
||||
|
||||
def test_merges_with_existing_pending(self, hermes_home):
|
||||
"""Subsequent calls merge into existing pending.json."""
|
||||
from hermes_cli.debug import _schedule_auto_delete, _load_pending
|
||||
|
||||
_schedule_auto_delete(["https://paste.rs/first"], delay_seconds=10)
|
||||
_schedule_auto_delete(["https://paste.rs/second"], delay_seconds=10)
|
||||
|
||||
entries = _load_pending()
|
||||
urls = {e["url"] for e in entries}
|
||||
assert urls == {"https://paste.rs/first", "https://paste.rs/second"}
|
||||
|
||||
def test_dedupes_same_url(self, hermes_home):
|
||||
"""Same URL recorded twice → one entry with the later expire_at."""
|
||||
from hermes_cli.debug import _schedule_auto_delete, _load_pending
|
||||
|
||||
_schedule_auto_delete(["https://paste.rs/dup"], delay_seconds=10)
|
||||
_schedule_auto_delete(["https://paste.rs/dup"], delay_seconds=100)
|
||||
|
||||
entries = _load_pending()
|
||||
assert len(entries) == 1
|
||||
assert entries[0]["url"] == "https://paste.rs/dup"
|
||||
|
||||
|
||||
class TestSweepExpiredPastes:
|
||||
"""Test the opportunistic sweep that replaces the sleeping subprocess."""
|
||||
|
||||
def test_sweep_empty_is_noop(self, hermes_home):
|
||||
from hermes_cli.debug import _sweep_expired_pastes
|
||||
|
||||
deleted, remaining = _sweep_expired_pastes()
|
||||
assert deleted == 0
|
||||
assert remaining == 0
|
||||
|
||||
def test_sweep_deletes_expired_entries(self, hermes_home):
|
||||
from hermes_cli.debug import (
|
||||
_sweep_expired_pastes,
|
||||
_save_pending,
|
||||
_load_pending,
|
||||
)
|
||||
import time
|
||||
|
||||
# Seed pending.json with one expired + one future entry
|
||||
_save_pending([
|
||||
{"url": "https://paste.rs/expired", "expire_at": time.time() - 100},
|
||||
{"url": "https://paste.rs/future", "expire_at": time.time() + 3600},
|
||||
])
|
||||
|
||||
delete_calls = []
|
||||
|
||||
def fake_delete(url):
|
||||
delete_calls.append(url)
|
||||
return True
|
||||
|
||||
with patch("hermes_cli.debug.delete_paste", side_effect=fake_delete):
|
||||
deleted, remaining = _sweep_expired_pastes()
|
||||
|
||||
assert delete_calls == ["https://paste.rs/expired"]
|
||||
assert deleted == 1
|
||||
assert remaining == 1
|
||||
|
||||
entries = _load_pending()
|
||||
urls = {e["url"] for e in entries}
|
||||
assert urls == {"https://paste.rs/future"}
|
||||
|
||||
def test_sweep_leaves_future_entries_alone(self, hermes_home):
|
||||
from hermes_cli.debug import _sweep_expired_pastes, _save_pending
|
||||
import time
|
||||
|
||||
_save_pending([
|
||||
{"url": "https://paste.rs/future1", "expire_at": time.time() + 3600},
|
||||
{"url": "https://paste.rs/future2", "expire_at": time.time() + 7200},
|
||||
])
|
||||
|
||||
with patch("hermes_cli.debug.delete_paste") as mock_delete:
|
||||
deleted, remaining = _sweep_expired_pastes()
|
||||
|
||||
mock_delete.assert_not_called()
|
||||
assert deleted == 0
|
||||
assert remaining == 2
|
||||
|
||||
def test_sweep_survives_network_failure(self, hermes_home):
|
||||
"""Failed DELETEs stay in pending.json until the 24h grace window."""
|
||||
from hermes_cli.debug import (
|
||||
_sweep_expired_pastes,
|
||||
_save_pending,
|
||||
_load_pending,
|
||||
)
|
||||
import time
|
||||
|
||||
_save_pending([
|
||||
{"url": "https://paste.rs/flaky", "expire_at": time.time() - 100},
|
||||
])
|
||||
|
||||
with patch(
|
||||
"hermes_cli.debug.delete_paste",
|
||||
side_effect=Exception("network down"),
|
||||
):
|
||||
deleted, remaining = _sweep_expired_pastes()
|
||||
|
||||
# Failure within 24h grace → kept for retry
|
||||
assert deleted == 0
|
||||
assert remaining == 1
|
||||
assert len(_load_pending()) == 1
|
||||
|
||||
def test_sweep_drops_entries_past_grace_window(self, hermes_home):
|
||||
"""After 24h past expiration, give up even on network failures."""
|
||||
from hermes_cli.debug import (
|
||||
_sweep_expired_pastes,
|
||||
_save_pending,
|
||||
_load_pending,
|
||||
)
|
||||
import time
|
||||
|
||||
# Expired 25 hours ago → past the 24h grace window
|
||||
very_old = time.time() - (25 * 3600)
|
||||
_save_pending([
|
||||
{"url": "https://paste.rs/ancient", "expire_at": very_old},
|
||||
])
|
||||
|
||||
with patch(
|
||||
"hermes_cli.debug.delete_paste",
|
||||
side_effect=Exception("network down"),
|
||||
):
|
||||
deleted, remaining = _sweep_expired_pastes()
|
||||
|
||||
assert deleted == 1
|
||||
assert remaining == 0
|
||||
assert _load_pending() == []
|
||||
|
||||
|
||||
class TestRunDebugSweepsOnInvocation:
|
||||
"""``run_debug`` must sweep expired pastes on every invocation."""
|
||||
|
||||
def test_run_debug_calls_sweep(self, hermes_home):
|
||||
from hermes_cli.debug import run_debug
|
||||
|
||||
args = MagicMock()
|
||||
args.debug_command = None # default → prints help
|
||||
|
||||
with patch("hermes_cli.debug._sweep_expired_pastes") as mock_sweep:
|
||||
run_debug(args)
|
||||
|
||||
mock_sweep.assert_called_once()
|
||||
|
||||
def test_run_debug_survives_sweep_failure(self, hermes_home, capsys):
|
||||
"""If the sweep throws, the subcommand still runs."""
|
||||
from hermes_cli.debug import run_debug
|
||||
|
||||
args = MagicMock()
|
||||
args.debug_command = None
|
||||
|
||||
with patch(
|
||||
"hermes_cli.debug._sweep_expired_pastes",
|
||||
side_effect=RuntimeError("boom"),
|
||||
):
|
||||
run_debug(args) # must not raise
|
||||
|
||||
# Default subcommand still printed help
|
||||
out = capsys.readouterr().out
|
||||
assert "Usage: hermes debug" in out
|
||||
|
||||
|
||||
class TestRunDebugDelete:
|
||||
|
|
|
|||
|
|
@ -28,12 +28,22 @@ def _isolate_sessions():
|
|||
bt._active_sessions.update(orig)
|
||||
|
||||
|
||||
def _make_socket_dir(tmpdir, session_name, pid=None):
|
||||
"""Create a fake agent-browser socket directory with optional PID file."""
|
||||
def _make_socket_dir(tmpdir, session_name, pid=None, owner_pid=None):
|
||||
"""Create a fake agent-browser socket directory with optional PID files.
|
||||
|
||||
Args:
|
||||
tmpdir: base temp directory
|
||||
session_name: name like "h_abc1234567" or "cdp_abc1234567"
|
||||
pid: daemon PID to write to <session>.pid (None = no file)
|
||||
owner_pid: owning hermes PID to write to <session>.owner_pid
|
||||
(None = no file; tests the legacy path)
|
||||
"""
|
||||
d = tmpdir / f"agent-browser-{session_name}"
|
||||
d.mkdir()
|
||||
if pid is not None:
|
||||
(d / f"{session_name}.pid").write_text(str(pid))
|
||||
if owner_pid is not None:
|
||||
(d / f"{session_name}.owner_pid").write_text(str(owner_pid))
|
||||
return d
|
||||
|
||||
|
||||
|
|
@ -62,7 +72,10 @@ class TestReapOrphanedBrowserSessions:
|
|||
assert not d.exists()
|
||||
|
||||
def test_orphaned_alive_daemon_is_killed(self, fake_tmpdir):
|
||||
"""Alive daemon not tracked by _active_sessions gets SIGTERM."""
|
||||
"""Alive daemon not tracked by _active_sessions gets SIGTERM (legacy path).
|
||||
|
||||
No owner_pid file => falls back to tracked_names check.
|
||||
"""
|
||||
from tools.browser_tool import _reap_orphaned_browser_sessions
|
||||
|
||||
d = _make_socket_dir(fake_tmpdir, "h_orphan12345", pid=12345)
|
||||
|
|
@ -84,7 +97,7 @@ class TestReapOrphanedBrowserSessions:
|
|||
assert (12345, signal.SIGTERM) in kill_calls
|
||||
|
||||
def test_tracked_session_is_not_reaped(self, fake_tmpdir):
|
||||
"""Sessions tracked in _active_sessions are left alone."""
|
||||
"""Sessions tracked in _active_sessions are left alone (legacy path)."""
|
||||
import tools.browser_tool as bt
|
||||
from tools.browser_tool import _reap_orphaned_browser_sessions
|
||||
|
||||
|
|
@ -156,3 +169,240 @@ class TestReapOrphanedBrowserSessions:
|
|||
|
||||
_reap_orphaned_browser_sessions()
|
||||
assert not d.exists()
|
||||
|
||||
|
||||
class TestOwnerPidCrossProcess:
|
||||
"""Tests for owner_pid-based cross-process safe reaping.
|
||||
|
||||
The owner_pid file records which hermes process owns a daemon so that
|
||||
concurrent hermes processes don't reap each other's active browser
|
||||
sessions. Added to fix orphan accumulation from crashed processes.
|
||||
"""
|
||||
|
||||
def test_alive_owner_is_not_reaped_even_when_untracked(self, fake_tmpdir):
|
||||
"""Daemon with alive owner_pid is NOT reaped, even if not in our _active_sessions.
|
||||
|
||||
This is the core cross-process safety check: Process B scanning while
|
||||
Process A is using a browser must not kill A's daemon.
|
||||
"""
|
||||
from tools.browser_tool import _reap_orphaned_browser_sessions
|
||||
|
||||
# Use our own PID as the "owner" — guaranteed alive
|
||||
d = _make_socket_dir(
|
||||
fake_tmpdir, "h_alive_owner", pid=12345, owner_pid=os.getpid()
|
||||
)
|
||||
|
||||
kill_calls = []
|
||||
|
||||
def mock_kill(pid, sig):
|
||||
kill_calls.append((pid, sig))
|
||||
if pid == os.getpid() and sig == 0:
|
||||
return # real existence check: owner alive
|
||||
if sig == 0:
|
||||
return # pretend daemon exists too
|
||||
# Don't actually kill anything
|
||||
|
||||
with patch("os.kill", side_effect=mock_kill):
|
||||
_reap_orphaned_browser_sessions()
|
||||
|
||||
# We should have checked the owner (sig 0) but never tried to kill
|
||||
# the daemon.
|
||||
assert (12345, signal.SIGTERM) not in kill_calls
|
||||
# Dir should still exist
|
||||
assert d.exists()
|
||||
|
||||
def test_dead_owner_triggers_reap(self, fake_tmpdir):
|
||||
"""Daemon whose owner_pid is dead gets reaped."""
|
||||
from tools.browser_tool import _reap_orphaned_browser_sessions
|
||||
|
||||
# PID 999999999 almost certainly doesn't exist
|
||||
d = _make_socket_dir(
|
||||
fake_tmpdir, "h_dead_owner1", pid=12345, owner_pid=999999999
|
||||
)
|
||||
|
||||
kill_calls = []
|
||||
|
||||
def mock_kill(pid, sig):
|
||||
kill_calls.append((pid, sig))
|
||||
if pid == 999999999 and sig == 0:
|
||||
raise ProcessLookupError # owner dead
|
||||
if pid == 12345 and sig == 0:
|
||||
return # daemon still alive
|
||||
# SIGTERM to daemon — noop in test
|
||||
|
||||
with patch("os.kill", side_effect=mock_kill):
|
||||
_reap_orphaned_browser_sessions()
|
||||
|
||||
# Owner checked (returned dead), daemon checked (alive), daemon killed
|
||||
assert (999999999, 0) in kill_calls
|
||||
assert (12345, 0) in kill_calls
|
||||
assert (12345, signal.SIGTERM) in kill_calls
|
||||
# Dir cleaned up
|
||||
assert not d.exists()
|
||||
|
||||
def test_corrupt_owner_pid_falls_back_to_legacy(self, fake_tmpdir):
|
||||
"""Corrupt owner_pid file → fall back to tracked_names check."""
|
||||
import tools.browser_tool as bt
|
||||
from tools.browser_tool import _reap_orphaned_browser_sessions
|
||||
|
||||
session_name = "h_corrupt_own"
|
||||
d = _make_socket_dir(fake_tmpdir, session_name, pid=12345)
|
||||
# Write garbage to owner_pid file
|
||||
(d / f"{session_name}.owner_pid").write_text("not-a-pid")
|
||||
|
||||
# Register session so legacy fallback leaves it alone
|
||||
bt._active_sessions["task"] = {"session_name": session_name}
|
||||
|
||||
kill_calls = []
|
||||
|
||||
def mock_kill(pid, sig):
|
||||
kill_calls.append((pid, sig))
|
||||
|
||||
with patch("os.kill", side_effect=mock_kill):
|
||||
_reap_orphaned_browser_sessions()
|
||||
|
||||
# Legacy path took over → tracked → not reaped
|
||||
assert (12345, signal.SIGTERM) not in kill_calls
|
||||
assert d.exists()
|
||||
|
||||
def test_owner_pid_permission_error_treated_as_alive(self, fake_tmpdir):
|
||||
"""If os.kill(owner, 0) raises PermissionError, treat owner as alive.
|
||||
|
||||
PermissionError means the PID exists but is owned by a different user —
|
||||
we must not assume the owner is dead (could kill someone else's daemon).
|
||||
"""
|
||||
from tools.browser_tool import _reap_orphaned_browser_sessions
|
||||
|
||||
d = _make_socket_dir(
|
||||
fake_tmpdir, "h_perm_owner1", pid=12345, owner_pid=22222
|
||||
)
|
||||
|
||||
kill_calls = []
|
||||
|
||||
def mock_kill(pid, sig):
|
||||
kill_calls.append((pid, sig))
|
||||
if pid == 22222 and sig == 0:
|
||||
raise PermissionError("not our user")
|
||||
|
||||
with patch("os.kill", side_effect=mock_kill):
|
||||
_reap_orphaned_browser_sessions()
|
||||
|
||||
# Must NOT have tried to kill the daemon
|
||||
assert (12345, signal.SIGTERM) not in kill_calls
|
||||
assert d.exists()
|
||||
|
||||
def test_write_owner_pid_creates_file_with_current_pid(
|
||||
self, fake_tmpdir, monkeypatch
|
||||
):
|
||||
"""_write_owner_pid(dir, session) writes <session>.owner_pid with os.getpid()."""
|
||||
import tools.browser_tool as bt
|
||||
|
||||
session_name = "h_ownertest01"
|
||||
socket_dir = fake_tmpdir / f"agent-browser-{session_name}"
|
||||
socket_dir.mkdir()
|
||||
|
||||
bt._write_owner_pid(str(socket_dir), session_name)
|
||||
|
||||
owner_pid_file = socket_dir / f"{session_name}.owner_pid"
|
||||
assert owner_pid_file.exists()
|
||||
assert owner_pid_file.read_text().strip() == str(os.getpid())
|
||||
|
||||
def test_write_owner_pid_is_idempotent(self, fake_tmpdir):
|
||||
"""Calling _write_owner_pid twice leaves a single owner_pid file."""
|
||||
import tools.browser_tool as bt
|
||||
|
||||
session_name = "h_idempot1234"
|
||||
socket_dir = fake_tmpdir / f"agent-browser-{session_name}"
|
||||
socket_dir.mkdir()
|
||||
|
||||
bt._write_owner_pid(str(socket_dir), session_name)
|
||||
bt._write_owner_pid(str(socket_dir), session_name)
|
||||
|
||||
files = list(socket_dir.glob("*.owner_pid"))
|
||||
assert len(files) == 1
|
||||
assert files[0].read_text().strip() == str(os.getpid())
|
||||
|
||||
def test_write_owner_pid_swallows_oserror(self, fake_tmpdir, monkeypatch):
|
||||
"""OSError (e.g. permission denied) doesn't propagate — the reaper
|
||||
falls back to the legacy tracked_names heuristic in that case.
|
||||
"""
|
||||
import tools.browser_tool as bt
|
||||
|
||||
def raise_oserror(*a, **kw):
|
||||
raise OSError("permission denied")
|
||||
|
||||
monkeypatch.setattr("builtins.open", raise_oserror)
|
||||
|
||||
# Must not raise
|
||||
bt._write_owner_pid(str(fake_tmpdir), "h_readonly123")
|
||||
|
||||
def test_run_browser_command_calls_write_owner_pid(
|
||||
self, fake_tmpdir, monkeypatch
|
||||
):
|
||||
"""_run_browser_command wires _write_owner_pid after mkdir."""
|
||||
import tools.browser_tool as bt
|
||||
|
||||
session_name = "h_wiringtest1"
|
||||
|
||||
# Short-circuit Popen so we exit after the owner_pid write
|
||||
class _FakePopen:
|
||||
def __init__(self, *a, **kw):
|
||||
raise RuntimeError("short-circuit after owner_pid")
|
||||
|
||||
monkeypatch.setattr(bt.subprocess, "Popen", _FakePopen)
|
||||
monkeypatch.setattr(bt, "_find_agent_browser", lambda: "/bin/true")
|
||||
monkeypatch.setattr(
|
||||
bt, "_requires_real_termux_browser_install", lambda *a: False
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
bt, "_get_session_info",
|
||||
lambda task_id: {"session_name": session_name},
|
||||
)
|
||||
|
||||
calls = []
|
||||
orig_write = bt._write_owner_pid
|
||||
|
||||
def _spy(*a, **kw):
|
||||
calls.append(a)
|
||||
orig_write(*a, **kw)
|
||||
|
||||
monkeypatch.setattr(bt, "_write_owner_pid", _spy)
|
||||
|
||||
with patch("tools.browser_tool._socket_safe_tmpdir", return_value=str(fake_tmpdir)):
|
||||
try:
|
||||
bt._run_browser_command(task_id="test_task", command="goto", args=[])
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
assert calls, "_run_browser_command must call _write_owner_pid"
|
||||
# First positional arg is the socket_dir, second is the session_name
|
||||
socket_dir_arg, session_name_arg = calls[0][0], calls[0][1]
|
||||
assert session_name_arg == session_name
|
||||
assert session_name in socket_dir_arg
|
||||
|
||||
|
||||
class TestEmergencyCleanupRunsReaper:
|
||||
"""Verify atexit-registered cleanup sweeps orphans even without an active session."""
|
||||
|
||||
def test_emergency_cleanup_calls_reaper(self, fake_tmpdir, monkeypatch):
|
||||
"""_emergency_cleanup_all_sessions must call _reap_orphaned_browser_sessions."""
|
||||
import tools.browser_tool as bt
|
||||
|
||||
# Reset the _cleanup_done flag so the cleanup actually runs
|
||||
monkeypatch.setattr(bt, "_cleanup_done", False)
|
||||
|
||||
reaper_called = []
|
||||
orig_reaper = bt._reap_orphaned_browser_sessions
|
||||
|
||||
def _spy_reaper():
|
||||
reaper_called.append(True)
|
||||
orig_reaper()
|
||||
|
||||
monkeypatch.setattr(bt, "_reap_orphaned_browser_sessions", _spy_reaper)
|
||||
|
||||
# No active sessions — reaper should still run
|
||||
bt._emergency_cleanup_all_sessions()
|
||||
|
||||
assert reaper_called, (
|
||||
"Reaper must run on exit even with no active sessions"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -459,27 +459,38 @@ def _emergency_cleanup_all_sessions():
|
|||
"""
|
||||
Emergency cleanup of all active browser sessions.
|
||||
Called on process exit or interrupt to prevent orphaned sessions.
|
||||
|
||||
Also runs the orphan reaper to clean up daemons left behind by previously
|
||||
crashed hermes processes — this way every clean hermes exit sweeps
|
||||
accumulated orphans, not just ones that actively used the browser tool.
|
||||
"""
|
||||
global _cleanup_done
|
||||
if _cleanup_done:
|
||||
return
|
||||
_cleanup_done = True
|
||||
|
||||
if not _active_sessions:
|
||||
return
|
||||
|
||||
logger.info("Emergency cleanup: closing %s active session(s)...",
|
||||
len(_active_sessions))
|
||||
|
||||
# Clean up this process's own sessions first, so their owner_pid files
|
||||
# are removed before the reaper scans.
|
||||
if _active_sessions:
|
||||
logger.info("Emergency cleanup: closing %s active session(s)...",
|
||||
len(_active_sessions))
|
||||
try:
|
||||
cleanup_all_browsers()
|
||||
except Exception as e:
|
||||
logger.error("Emergency cleanup error: %s", e)
|
||||
finally:
|
||||
with _cleanup_lock:
|
||||
_active_sessions.clear()
|
||||
_session_last_activity.clear()
|
||||
_recording_sessions.clear()
|
||||
|
||||
# Sweep orphans from other crashed hermes processes. Safe even if we
|
||||
# never used the browser — uses owner_pid liveness to avoid reaping
|
||||
# daemons owned by other live hermes processes.
|
||||
try:
|
||||
cleanup_all_browsers()
|
||||
_reap_orphaned_browser_sessions()
|
||||
except Exception as e:
|
||||
logger.error("Emergency cleanup error: %s", e)
|
||||
finally:
|
||||
with _cleanup_lock:
|
||||
_active_sessions.clear()
|
||||
_session_last_activity.clear()
|
||||
_recording_sessions.clear()
|
||||
logger.debug("Orphan reap on exit failed: %s", e)
|
||||
|
||||
|
||||
# Register cleanup via atexit only. Previous versions installed SIGINT/SIGTERM
|
||||
|
|
@ -523,6 +534,24 @@ def _cleanup_inactive_browser_sessions():
|
|||
logger.warning("Error cleaning up inactive session %s: %s", task_id, e)
|
||||
|
||||
|
||||
def _write_owner_pid(socket_dir: str, session_name: str) -> None:
|
||||
"""Record the current hermes PID as the owner of a browser socket dir.
|
||||
|
||||
Written atomically to ``<socket_dir>/<session_name>.owner_pid`` so the
|
||||
orphan reaper can distinguish daemons owned by a live hermes process
|
||||
(don't reap) from daemons whose owner crashed (reap). Best-effort —
|
||||
an OSError here just falls back to the legacy ``tracked_names``
|
||||
heuristic in the reaper.
|
||||
"""
|
||||
try:
|
||||
path = os.path.join(socket_dir, f"{session_name}.owner_pid")
|
||||
with open(path, "w") as f:
|
||||
f.write(str(os.getpid()))
|
||||
except OSError as exc:
|
||||
logger.debug("Could not write owner_pid file for %s: %s",
|
||||
session_name, exc)
|
||||
|
||||
|
||||
def _reap_orphaned_browser_sessions():
|
||||
"""Scan for orphaned agent-browser daemon processes from previous runs.
|
||||
|
||||
|
|
@ -532,10 +561,19 @@ def _reap_orphaned_browser_sessions():
|
|||
|
||||
This function scans the tmp directory for ``agent-browser-*`` socket dirs
|
||||
left behind by previous runs, reads the daemon PID files, and kills any
|
||||
daemons that are still alive but not tracked by the current process.
|
||||
daemons whose owning hermes process is no longer alive.
|
||||
|
||||
Called once on cleanup-thread startup — not every 30 seconds — to avoid
|
||||
races with sessions being actively created.
|
||||
Ownership detection priority:
|
||||
1. ``<session>.owner_pid`` file (written by current code) — if the
|
||||
referenced hermes PID is alive, leave the daemon alone regardless
|
||||
of whether it's in *this* process's ``_active_sessions``. This is
|
||||
cross-process safe: two concurrent hermes instances won't reap each
|
||||
other's daemons.
|
||||
2. Fallback for daemons that predate owner_pid: check
|
||||
``_active_sessions`` in the current process. If not tracked here,
|
||||
treat as orphan (legacy behavior).
|
||||
|
||||
Safe to call from any context — atexit, cleanup thread, or on demand.
|
||||
"""
|
||||
import glob
|
||||
|
||||
|
|
@ -548,7 +586,7 @@ def _reap_orphaned_browser_sessions():
|
|||
if not socket_dirs:
|
||||
return
|
||||
|
||||
# Build set of session_names currently tracked by this process
|
||||
# Build set of session_names currently tracked by this process (fallback path)
|
||||
with _cleanup_lock:
|
||||
tracked_names = {
|
||||
info.get("session_name")
|
||||
|
|
@ -564,13 +602,38 @@ def _reap_orphaned_browser_sessions():
|
|||
if not session_name:
|
||||
continue
|
||||
|
||||
# Skip sessions that we are actively tracking
|
||||
if session_name in tracked_names:
|
||||
# Ownership check: prefer owner_pid file (cross-process safe).
|
||||
owner_pid_file = os.path.join(socket_dir, f"{session_name}.owner_pid")
|
||||
owner_alive: Optional[bool] = None # None = owner_pid missing/unreadable
|
||||
if os.path.isfile(owner_pid_file):
|
||||
try:
|
||||
owner_pid = int(Path(owner_pid_file).read_text().strip())
|
||||
try:
|
||||
os.kill(owner_pid, 0)
|
||||
owner_alive = True
|
||||
except ProcessLookupError:
|
||||
owner_alive = False
|
||||
except PermissionError:
|
||||
# Owner exists but we can't signal it (different uid).
|
||||
# Treat as alive — don't reap someone else's session.
|
||||
owner_alive = True
|
||||
except (ValueError, OSError):
|
||||
owner_alive = None # corrupt file — fall through
|
||||
|
||||
if owner_alive is True:
|
||||
# Owner is alive — this session belongs to a live hermes process.
|
||||
continue
|
||||
|
||||
if owner_alive is None:
|
||||
# No owner_pid file (legacy daemon). Fall back to in-process
|
||||
# tracking: if this process knows about the session, leave alone.
|
||||
if session_name in tracked_names:
|
||||
continue
|
||||
|
||||
# owner_alive is False (dead owner) OR legacy daemon not tracked here.
|
||||
pid_file = os.path.join(socket_dir, f"{session_name}.pid")
|
||||
if not os.path.isfile(pid_file):
|
||||
# No PID file — just a stale dir, remove it
|
||||
# No daemon PID file — just a stale dir, remove it
|
||||
shutil.rmtree(socket_dir, ignore_errors=True)
|
||||
continue
|
||||
|
||||
|
|
@ -591,7 +654,7 @@ def _reap_orphaned_browser_sessions():
|
|||
# Alive but owned by someone else — leave it alone
|
||||
continue
|
||||
|
||||
# Daemon is alive and not tracked — orphan. Kill it.
|
||||
# Daemon is alive and its owner is dead (or legacy + untracked). Reap.
|
||||
try:
|
||||
os.kill(daemon_pid, signal.SIGTERM)
|
||||
logger.info("Reaped orphaned browser daemon PID %d (session %s)",
|
||||
|
|
@ -1105,6 +1168,9 @@ def _run_browser_command(
|
|||
f"agent-browser-{session_info['session_name']}"
|
||||
)
|
||||
os.makedirs(task_socket_dir, mode=0o700, exist_ok=True)
|
||||
# Record this hermes PID as the session owner (cross-process safe
|
||||
# orphan detection — see _write_owner_pid).
|
||||
_write_owner_pid(task_socket_dir, session_info['session_name'])
|
||||
logger.debug("browser cmd=%s task=%s socket_dir=%s (%d chars)",
|
||||
command, task_id, task_socket_dir, len(task_socket_dir))
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue