mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(tools): bound _read_tracker sub-containers + prune _completion_consumed (#11839)
Two accretion-over-time leaks that compound over long CLI / gateway lifetimes. Both were flagged in the memory-leak audit. ## file_tools._read_tracker _read_tracker[task_id] holds three sub-containers that grew unbounded: read_history set of (path, offset, limit) tuples — 1 per unique read dedup dict of (path, offset, limit) → mtime — same growth pattern read_timestamps dict of resolved_path → mtime — 1 per unique path A CLI session uses one stable task_id for its lifetime, so these were uncapped. A 10k-read session accumulated ~1.5MB of tracker state that the tool no longer needed (only the most recent reads are relevant for dedup, consecutive-loop detection, and write/patch external-edit warnings). Fix: _cap_read_tracker_data() enforces hard caps on each container after every add. Defaults: read_history=500, dedup=1000, read_timestamps=1000. Eviction is insertion-order (Python 3.7+ dict guarantee) for the dicts; arbitrary for the set (which only feeds diagnostic summaries). ## process_registry._completion_consumed Module-level set that recorded every session_id ever polled / waited / logged. No pruning. Each entry is ~20 bytes, so the absolute leak is small, but on a gateway processing thousands of background commands per day the set grows until process exit. Fix: _prune_if_needed() now discards _completion_consumed entries alongside the session dict evictions it already performs (both the TTL-based prune and the LRU-over-cap prune). Adds a final belt-and-suspenders pass that drops any dangling entries whose session_id no longer appears in _running or _finished. Tests: tests/tools/test_accretion_caps.py — 9 cases * Each container bound respected, oldest evicted * No-op when under cap (no unnecessary work) * Handles missing sub-containers without crashing * Live read_file_tool path enforces caps end-to-end * _completion_consumed pruned on TTL expiry * _completion_consumed pruned on LRU eviction * Dangling entries (no backing session) cleared Broader suite: 3486 tests/tools + tests/cli pass. The single flake (test_alias_command_passes_args) reproduces on unchanged main — known cross-test pollution under suite-order load.
This commit is contained in:
parent
0a83187801
commit
3f43aec15d
3 changed files with 266 additions and 0 deletions
199
tests/tools/test_accretion_caps.py
Normal file
199
tests/tools/test_accretion_caps.py
Normal file
|
|
@ -0,0 +1,199 @@
|
|||
"""Accretion caps for _read_tracker (file_tools) and _completion_consumed
|
||||
(process_registry).
|
||||
|
||||
Both structures are process-lifetime singletons that previously grew
|
||||
unbounded in long-running CLI / gateway sessions:
|
||||
|
||||
file_tools._read_tracker[task_id]
|
||||
├─ read_history (set) — one entry per unique (path, offset, limit)
|
||||
├─ dedup (dict) — one entry per unique (path, offset, limit)
|
||||
└─ read_timestamps (dict) — one entry per unique resolved path
|
||||
process_registry._completion_consumed (set) — one entry per session_id
|
||||
ever polled / waited / logged
|
||||
|
||||
None of these were ever trimmed. A 10k-read CLI session accumulated
|
||||
roughly 1.5MB of tracker state; a gateway with high background-process
|
||||
churn accumulated ~20B per session_id until the process exited.
|
||||
|
||||
These tests pin the new caps + prune hooks.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class TestReadTrackerCaps:
|
||||
def setup_method(self):
|
||||
from tools import file_tools
|
||||
|
||||
# Clean slate per test.
|
||||
with file_tools._read_tracker_lock:
|
||||
file_tools._read_tracker.clear()
|
||||
|
||||
def test_read_history_capped(self, monkeypatch):
|
||||
"""read_history set is bounded by _READ_HISTORY_CAP."""
|
||||
from tools import file_tools as ft
|
||||
|
||||
monkeypatch.setattr(ft, "_READ_HISTORY_CAP", 10)
|
||||
task_data = {
|
||||
"last_key": None,
|
||||
"consecutive": 0,
|
||||
"read_history": set((f"/p{i}", 0, 500) for i in range(50)),
|
||||
"dedup": {},
|
||||
"read_timestamps": {},
|
||||
}
|
||||
ft._cap_read_tracker_data(task_data)
|
||||
assert len(task_data["read_history"]) == 10
|
||||
|
||||
def test_dedup_capped_oldest_first(self, monkeypatch):
|
||||
"""dedup dict is bounded; oldest entries evicted first."""
|
||||
from tools import file_tools as ft
|
||||
|
||||
monkeypatch.setattr(ft, "_DEDUP_CAP", 5)
|
||||
task_data = {
|
||||
"read_history": set(),
|
||||
"dedup": {(f"/p{i}", 0, 500): float(i) for i in range(20)},
|
||||
"read_timestamps": {},
|
||||
}
|
||||
ft._cap_read_tracker_data(task_data)
|
||||
assert len(task_data["dedup"]) == 5
|
||||
# Entries 15-19 (inserted last) should survive.
|
||||
assert ("/p19", 0, 500) in task_data["dedup"]
|
||||
assert ("/p15", 0, 500) in task_data["dedup"]
|
||||
# Entries 0-14 should be evicted.
|
||||
assert ("/p0", 0, 500) not in task_data["dedup"]
|
||||
assert ("/p14", 0, 500) not in task_data["dedup"]
|
||||
|
||||
def test_read_timestamps_capped_oldest_first(self, monkeypatch):
|
||||
"""read_timestamps dict is bounded; oldest entries evicted first."""
|
||||
from tools import file_tools as ft
|
||||
|
||||
monkeypatch.setattr(ft, "_READ_TIMESTAMPS_CAP", 3)
|
||||
task_data = {
|
||||
"read_history": set(),
|
||||
"dedup": {},
|
||||
"read_timestamps": {f"/path/{i}": float(i) for i in range(10)},
|
||||
}
|
||||
ft._cap_read_tracker_data(task_data)
|
||||
assert len(task_data["read_timestamps"]) == 3
|
||||
assert "/path/9" in task_data["read_timestamps"]
|
||||
assert "/path/7" in task_data["read_timestamps"]
|
||||
assert "/path/0" not in task_data["read_timestamps"]
|
||||
|
||||
def test_cap_is_idempotent_under_cap(self, monkeypatch):
|
||||
"""When containers are under cap, _cap_read_tracker_data is a no-op."""
|
||||
from tools import file_tools as ft
|
||||
|
||||
monkeypatch.setattr(ft, "_READ_HISTORY_CAP", 100)
|
||||
monkeypatch.setattr(ft, "_DEDUP_CAP", 100)
|
||||
monkeypatch.setattr(ft, "_READ_TIMESTAMPS_CAP", 100)
|
||||
task_data = {
|
||||
"read_history": {("/a", 0, 500), ("/b", 0, 500)},
|
||||
"dedup": {("/a", 0, 500): 1.0},
|
||||
"read_timestamps": {"/a": 1.0},
|
||||
}
|
||||
rh_before = set(task_data["read_history"])
|
||||
dedup_before = dict(task_data["dedup"])
|
||||
ts_before = dict(task_data["read_timestamps"])
|
||||
|
||||
ft._cap_read_tracker_data(task_data)
|
||||
|
||||
assert task_data["read_history"] == rh_before
|
||||
assert task_data["dedup"] == dedup_before
|
||||
assert task_data["read_timestamps"] == ts_before
|
||||
|
||||
def test_cap_handles_missing_containers(self):
|
||||
"""Missing sub-keys don't cause AttributeError."""
|
||||
from tools import file_tools as ft
|
||||
|
||||
ft._cap_read_tracker_data({}) # no containers at all
|
||||
ft._cap_read_tracker_data({"read_history": None})
|
||||
ft._cap_read_tracker_data({"dedup": None})
|
||||
|
||||
def test_live_cap_applied_after_read_add(self, tmp_path, monkeypatch):
|
||||
"""Live read_file path enforces caps."""
|
||||
from tools import file_tools as ft
|
||||
|
||||
monkeypatch.setattr(ft, "_READ_HISTORY_CAP", 3)
|
||||
monkeypatch.setattr(ft, "_DEDUP_CAP", 3)
|
||||
monkeypatch.setattr(ft, "_READ_TIMESTAMPS_CAP", 3)
|
||||
|
||||
# Create 10 distinct files and read each once.
|
||||
for i in range(10):
|
||||
p = tmp_path / f"file_{i}.txt"
|
||||
p.write_text(f"content {i}\n" * 10)
|
||||
ft.read_file_tool(path=str(p), task_id="long-session")
|
||||
|
||||
with ft._read_tracker_lock:
|
||||
td = ft._read_tracker["long-session"]
|
||||
assert len(td["read_history"]) <= 3
|
||||
assert len(td["dedup"]) <= 3
|
||||
assert len(td["read_timestamps"]) <= 3
|
||||
|
||||
|
||||
class TestCompletionConsumedPrune:
|
||||
def test_prune_drops_completion_entry_with_expired_session(self):
|
||||
"""When a finished session is pruned, _completion_consumed is
|
||||
cleared for the same session_id."""
|
||||
from tools.process_registry import ProcessRegistry, FINISHED_TTL_SECONDS
|
||||
import time
|
||||
|
||||
reg = ProcessRegistry()
|
||||
# Fake a finished session whose started_at is older than the TTL.
|
||||
class _FakeSess:
|
||||
def __init__(self, sid):
|
||||
self.id = sid
|
||||
self.started_at = time.time() - (FINISHED_TTL_SECONDS + 100)
|
||||
self.exited = True
|
||||
|
||||
reg._finished["stale-1"] = _FakeSess("stale-1")
|
||||
reg._completion_consumed.add("stale-1")
|
||||
|
||||
with reg._lock:
|
||||
reg._prune_if_needed()
|
||||
|
||||
assert "stale-1" not in reg._finished
|
||||
assert "stale-1" not in reg._completion_consumed
|
||||
|
||||
def test_prune_drops_completion_entry_for_lru_evicted(self):
|
||||
"""Same contract for the LRU path (over MAX_PROCESSES)."""
|
||||
from tools import process_registry as pr
|
||||
import time
|
||||
|
||||
reg = pr.ProcessRegistry()
|
||||
|
||||
class _FakeSess:
|
||||
def __init__(self, sid, started):
|
||||
self.id = sid
|
||||
self.started_at = started
|
||||
self.exited = True
|
||||
|
||||
# Fill above MAX_PROCESSES with recently-finished sessions.
|
||||
now = time.time()
|
||||
for i in range(pr.MAX_PROCESSES + 5):
|
||||
sid = f"sess-{i}"
|
||||
reg._finished[sid] = _FakeSess(sid, now - i) # sess-0 newest
|
||||
reg._completion_consumed.add(sid)
|
||||
|
||||
with reg._lock:
|
||||
# _prune_if_needed removes one oldest finished per invocation;
|
||||
# call it enough times to trim back down.
|
||||
for _ in range(10):
|
||||
reg._prune_if_needed()
|
||||
|
||||
# The _completion_consumed set should not contain session IDs that
|
||||
# are no longer in _running or _finished.
|
||||
assert (reg._completion_consumed - (reg._running.keys() | reg._finished.keys())) == set()
|
||||
|
||||
def test_prune_clears_dangling_completion_entries(self):
|
||||
"""Stale entries in _completion_consumed without a backing session
|
||||
record are cleared out (belt-and-suspenders invariant)."""
|
||||
from tools.process_registry import ProcessRegistry
|
||||
|
||||
reg = ProcessRegistry()
|
||||
# Add a dangling entry that was never in _running or _finished.
|
||||
reg._completion_consumed.add("dangling-never-tracked")
|
||||
|
||||
with reg._lock:
|
||||
reg._prune_if_needed()
|
||||
|
||||
assert "dangling-never-tracked" not in reg._completion_consumed
|
||||
|
|
@ -148,6 +148,58 @@ _file_ops_cache: dict = {}
|
|||
_read_tracker_lock = threading.Lock()
|
||||
_read_tracker: dict = {}
|
||||
|
||||
# Per-task bounds for the containers inside each _read_tracker[task_id].
|
||||
# A CLI session uses one stable task_id for its lifetime; without these
|
||||
# caps, a 10k-read session would accumulate ~1.5MB of dict/set state that
|
||||
# is never referenced again (only the most recent reads matter for dedup,
|
||||
# loop detection, and external-edit warnings). Hard caps bound the
|
||||
# accretion to a few hundred KB regardless of session length.
|
||||
_READ_HISTORY_CAP = 500 # set; used only by get_read_files_summary
|
||||
_DEDUP_CAP = 1000 # dict; skip-identical-reread guard
|
||||
_READ_TIMESTAMPS_CAP = 1000 # dict; external-edit detection for write/patch
|
||||
|
||||
|
||||
def _cap_read_tracker_data(task_data: dict) -> None:
|
||||
"""Enforce size caps on the per-task read-tracker sub-containers.
|
||||
|
||||
Must be called with ``_read_tracker_lock`` held. Eviction policy:
|
||||
|
||||
* ``read_history`` (set): pop arbitrary entries on overflow. This
|
||||
is fine because the set only feeds diagnostic summaries; losing
|
||||
old entries just trims the summary's tail.
|
||||
* ``dedup`` / ``read_timestamps`` (dict): pop oldest by insertion
|
||||
order (Python 3.7+ dicts). Evicted entries lose their dedup
|
||||
skip on a future re-read (the file gets re-sent once) and
|
||||
external-edit mtime comparison (the write/patch falls back to
|
||||
a non-mtime check). Both are graceful degradations, not bugs.
|
||||
"""
|
||||
rh = task_data.get("read_history")
|
||||
if rh is not None and len(rh) > _READ_HISTORY_CAP:
|
||||
excess = len(rh) - _READ_HISTORY_CAP
|
||||
for _ in range(excess):
|
||||
try:
|
||||
rh.pop()
|
||||
except KeyError:
|
||||
break
|
||||
|
||||
dedup = task_data.get("dedup")
|
||||
if dedup is not None and len(dedup) > _DEDUP_CAP:
|
||||
excess = len(dedup) - _DEDUP_CAP
|
||||
for _ in range(excess):
|
||||
try:
|
||||
dedup.pop(next(iter(dedup)))
|
||||
except (StopIteration, KeyError):
|
||||
break
|
||||
|
||||
ts = task_data.get("read_timestamps")
|
||||
if ts is not None and len(ts) > _READ_TIMESTAMPS_CAP:
|
||||
excess = len(ts) - _READ_TIMESTAMPS_CAP
|
||||
for _ in range(excess):
|
||||
try:
|
||||
ts.pop(next(iter(ts)))
|
||||
except (StopIteration, KeyError):
|
||||
break
|
||||
|
||||
|
||||
def _get_file_ops(task_id: str = "default") -> ShellFileOperations:
|
||||
"""Get or create ShellFileOperations for a terminal environment.
|
||||
|
|
@ -426,6 +478,10 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str =
|
|||
except OSError:
|
||||
pass # Can't stat — skip tracking for this entry
|
||||
|
||||
# Bound the per-task containers so a long CLI session doesn't
|
||||
# accumulate megabytes of dict/set state. See _cap_read_tracker_data.
|
||||
_cap_read_tracker_data(task_data)
|
||||
|
||||
if count >= 4:
|
||||
# Hard block: stop returning content to break the loop
|
||||
return json.dumps({
|
||||
|
|
@ -505,6 +561,7 @@ def _update_read_timestamp(filepath: str, task_id: str) -> None:
|
|||
task_data = _read_tracker.get(task_id)
|
||||
if task_data is not None:
|
||||
task_data.setdefault("read_timestamps", {})[resolved] = current_mtime
|
||||
_cap_read_tracker_data(task_data)
|
||||
|
||||
|
||||
def _check_file_staleness(filepath: str, task_id: str) -> str | None:
|
||||
|
|
|
|||
|
|
@ -970,12 +970,22 @@ class ProcessRegistry:
|
|||
]
|
||||
for sid in expired:
|
||||
del self._finished[sid]
|
||||
self._completion_consumed.discard(sid)
|
||||
|
||||
# If still over limit, remove oldest finished
|
||||
total = len(self._running) + len(self._finished)
|
||||
if total >= MAX_PROCESSES and self._finished:
|
||||
oldest_id = min(self._finished, key=lambda sid: self._finished[sid].started_at)
|
||||
del self._finished[oldest_id]
|
||||
self._completion_consumed.discard(oldest_id)
|
||||
|
||||
# Drop any _completion_consumed entries whose sessions are no longer
|
||||
# tracked at all — belt-and-suspenders against module-lifetime growth
|
||||
# on process-registry lookup paths that don't reach the dict prunes.
|
||||
tracked = self._running.keys() | self._finished.keys()
|
||||
stale = self._completion_consumed - tracked
|
||||
if stale:
|
||||
self._completion_consumed -= stale
|
||||
|
||||
# ----- Checkpoint (crash recovery) -----
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue