From 3f43aec15d452d5b64d0ef71fdc71f8f1765fdc1 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Fri, 17 Apr 2026 15:53:57 -0700 Subject: [PATCH] fix(tools): bound _read_tracker sub-containers + prune _completion_consumed (#11839) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two accretion-over-time leaks that compound over long CLI / gateway lifetimes. Both were flagged in the memory-leak audit. ## file_tools._read_tracker _read_tracker[task_id] holds three sub-containers that grew unbounded: read_history set of (path, offset, limit) tuples — 1 per unique read dedup dict of (path, offset, limit) → mtime — same growth pattern read_timestamps dict of resolved_path → mtime — 1 per unique path A CLI session uses one stable task_id for its lifetime, so these were uncapped. A 10k-read session accumulated ~1.5MB of tracker state that the tool no longer needed (only the most recent reads are relevant for dedup, consecutive-loop detection, and write/patch external-edit warnings). Fix: _cap_read_tracker_data() enforces hard caps on each container after every add. Defaults: read_history=500, dedup=1000, read_timestamps=1000. Eviction is insertion-order (Python 3.7+ dict guarantee) for the dicts; arbitrary for the set (which only feeds diagnostic summaries). ## process_registry._completion_consumed Module-level set that recorded every session_id ever polled / waited / logged. No pruning. Each entry is ~20 bytes, so the absolute leak is small, but on a gateway processing thousands of background commands per day the set grows until process exit. Fix: _prune_if_needed() now discards _completion_consumed entries alongside the session dict evictions it already performs (both the TTL-based prune and the LRU-over-cap prune). Adds a final belt-and-suspenders pass that drops any dangling entries whose session_id no longer appears in _running or _finished. Tests: tests/tools/test_accretion_caps.py — 9 cases * Each container bound respected, oldest evicted * No-op when under cap (no unnecessary work) * Handles missing sub-containers without crashing * Live read_file_tool path enforces caps end-to-end * _completion_consumed pruned on TTL expiry * _completion_consumed pruned on LRU eviction * Dangling entries (no backing session) cleared Broader suite: 3486 tests/tools + tests/cli pass. The single flake (test_alias_command_passes_args) reproduces on unchanged main — known cross-test pollution under suite-order load. --- tests/tools/test_accretion_caps.py | 199 +++++++++++++++++++++++++++++ tools/file_tools.py | 57 +++++++++ tools/process_registry.py | 10 ++ 3 files changed, 266 insertions(+) create mode 100644 tests/tools/test_accretion_caps.py diff --git a/tests/tools/test_accretion_caps.py b/tests/tools/test_accretion_caps.py new file mode 100644 index 000000000..bdc9b41c3 --- /dev/null +++ b/tests/tools/test_accretion_caps.py @@ -0,0 +1,199 @@ +"""Accretion caps for _read_tracker (file_tools) and _completion_consumed +(process_registry). + +Both structures are process-lifetime singletons that previously grew +unbounded in long-running CLI / gateway sessions: + + file_tools._read_tracker[task_id] + ├─ read_history (set) — one entry per unique (path, offset, limit) + ├─ dedup (dict) — one entry per unique (path, offset, limit) + └─ read_timestamps (dict) — one entry per unique resolved path + process_registry._completion_consumed (set) — one entry per session_id + ever polled / waited / logged + +None of these were ever trimmed. A 10k-read CLI session accumulated +roughly 1.5MB of tracker state; a gateway with high background-process +churn accumulated ~20B per session_id until the process exited. + +These tests pin the new caps + prune hooks. +""" + +import pytest + + +class TestReadTrackerCaps: + def setup_method(self): + from tools import file_tools + + # Clean slate per test. + with file_tools._read_tracker_lock: + file_tools._read_tracker.clear() + + def test_read_history_capped(self, monkeypatch): + """read_history set is bounded by _READ_HISTORY_CAP.""" + from tools import file_tools as ft + + monkeypatch.setattr(ft, "_READ_HISTORY_CAP", 10) + task_data = { + "last_key": None, + "consecutive": 0, + "read_history": set((f"/p{i}", 0, 500) for i in range(50)), + "dedup": {}, + "read_timestamps": {}, + } + ft._cap_read_tracker_data(task_data) + assert len(task_data["read_history"]) == 10 + + def test_dedup_capped_oldest_first(self, monkeypatch): + """dedup dict is bounded; oldest entries evicted first.""" + from tools import file_tools as ft + + monkeypatch.setattr(ft, "_DEDUP_CAP", 5) + task_data = { + "read_history": set(), + "dedup": {(f"/p{i}", 0, 500): float(i) for i in range(20)}, + "read_timestamps": {}, + } + ft._cap_read_tracker_data(task_data) + assert len(task_data["dedup"]) == 5 + # Entries 15-19 (inserted last) should survive. + assert ("/p19", 0, 500) in task_data["dedup"] + assert ("/p15", 0, 500) in task_data["dedup"] + # Entries 0-14 should be evicted. + assert ("/p0", 0, 500) not in task_data["dedup"] + assert ("/p14", 0, 500) not in task_data["dedup"] + + def test_read_timestamps_capped_oldest_first(self, monkeypatch): + """read_timestamps dict is bounded; oldest entries evicted first.""" + from tools import file_tools as ft + + monkeypatch.setattr(ft, "_READ_TIMESTAMPS_CAP", 3) + task_data = { + "read_history": set(), + "dedup": {}, + "read_timestamps": {f"/path/{i}": float(i) for i in range(10)}, + } + ft._cap_read_tracker_data(task_data) + assert len(task_data["read_timestamps"]) == 3 + assert "/path/9" in task_data["read_timestamps"] + assert "/path/7" in task_data["read_timestamps"] + assert "/path/0" not in task_data["read_timestamps"] + + def test_cap_is_idempotent_under_cap(self, monkeypatch): + """When containers are under cap, _cap_read_tracker_data is a no-op.""" + from tools import file_tools as ft + + monkeypatch.setattr(ft, "_READ_HISTORY_CAP", 100) + monkeypatch.setattr(ft, "_DEDUP_CAP", 100) + monkeypatch.setattr(ft, "_READ_TIMESTAMPS_CAP", 100) + task_data = { + "read_history": {("/a", 0, 500), ("/b", 0, 500)}, + "dedup": {("/a", 0, 500): 1.0}, + "read_timestamps": {"/a": 1.0}, + } + rh_before = set(task_data["read_history"]) + dedup_before = dict(task_data["dedup"]) + ts_before = dict(task_data["read_timestamps"]) + + ft._cap_read_tracker_data(task_data) + + assert task_data["read_history"] == rh_before + assert task_data["dedup"] == dedup_before + assert task_data["read_timestamps"] == ts_before + + def test_cap_handles_missing_containers(self): + """Missing sub-keys don't cause AttributeError.""" + from tools import file_tools as ft + + ft._cap_read_tracker_data({}) # no containers at all + ft._cap_read_tracker_data({"read_history": None}) + ft._cap_read_tracker_data({"dedup": None}) + + def test_live_cap_applied_after_read_add(self, tmp_path, monkeypatch): + """Live read_file path enforces caps.""" + from tools import file_tools as ft + + monkeypatch.setattr(ft, "_READ_HISTORY_CAP", 3) + monkeypatch.setattr(ft, "_DEDUP_CAP", 3) + monkeypatch.setattr(ft, "_READ_TIMESTAMPS_CAP", 3) + + # Create 10 distinct files and read each once. + for i in range(10): + p = tmp_path / f"file_{i}.txt" + p.write_text(f"content {i}\n" * 10) + ft.read_file_tool(path=str(p), task_id="long-session") + + with ft._read_tracker_lock: + td = ft._read_tracker["long-session"] + assert len(td["read_history"]) <= 3 + assert len(td["dedup"]) <= 3 + assert len(td["read_timestamps"]) <= 3 + + +class TestCompletionConsumedPrune: + def test_prune_drops_completion_entry_with_expired_session(self): + """When a finished session is pruned, _completion_consumed is + cleared for the same session_id.""" + from tools.process_registry import ProcessRegistry, FINISHED_TTL_SECONDS + import time + + reg = ProcessRegistry() + # Fake a finished session whose started_at is older than the TTL. + class _FakeSess: + def __init__(self, sid): + self.id = sid + self.started_at = time.time() - (FINISHED_TTL_SECONDS + 100) + self.exited = True + + reg._finished["stale-1"] = _FakeSess("stale-1") + reg._completion_consumed.add("stale-1") + + with reg._lock: + reg._prune_if_needed() + + assert "stale-1" not in reg._finished + assert "stale-1" not in reg._completion_consumed + + def test_prune_drops_completion_entry_for_lru_evicted(self): + """Same contract for the LRU path (over MAX_PROCESSES).""" + from tools import process_registry as pr + import time + + reg = pr.ProcessRegistry() + + class _FakeSess: + def __init__(self, sid, started): + self.id = sid + self.started_at = started + self.exited = True + + # Fill above MAX_PROCESSES with recently-finished sessions. + now = time.time() + for i in range(pr.MAX_PROCESSES + 5): + sid = f"sess-{i}" + reg._finished[sid] = _FakeSess(sid, now - i) # sess-0 newest + reg._completion_consumed.add(sid) + + with reg._lock: + # _prune_if_needed removes one oldest finished per invocation; + # call it enough times to trim back down. + for _ in range(10): + reg._prune_if_needed() + + # The _completion_consumed set should not contain session IDs that + # are no longer in _running or _finished. + assert (reg._completion_consumed - (reg._running.keys() | reg._finished.keys())) == set() + + def test_prune_clears_dangling_completion_entries(self): + """Stale entries in _completion_consumed without a backing session + record are cleared out (belt-and-suspenders invariant).""" + from tools.process_registry import ProcessRegistry + + reg = ProcessRegistry() + # Add a dangling entry that was never in _running or _finished. + reg._completion_consumed.add("dangling-never-tracked") + + with reg._lock: + reg._prune_if_needed() + + assert "dangling-never-tracked" not in reg._completion_consumed diff --git a/tools/file_tools.py b/tools/file_tools.py index ca2118c33..cf6246dd0 100644 --- a/tools/file_tools.py +++ b/tools/file_tools.py @@ -148,6 +148,58 @@ _file_ops_cache: dict = {} _read_tracker_lock = threading.Lock() _read_tracker: dict = {} +# Per-task bounds for the containers inside each _read_tracker[task_id]. +# A CLI session uses one stable task_id for its lifetime; without these +# caps, a 10k-read session would accumulate ~1.5MB of dict/set state that +# is never referenced again (only the most recent reads matter for dedup, +# loop detection, and external-edit warnings). Hard caps bound the +# accretion to a few hundred KB regardless of session length. +_READ_HISTORY_CAP = 500 # set; used only by get_read_files_summary +_DEDUP_CAP = 1000 # dict; skip-identical-reread guard +_READ_TIMESTAMPS_CAP = 1000 # dict; external-edit detection for write/patch + + +def _cap_read_tracker_data(task_data: dict) -> None: + """Enforce size caps on the per-task read-tracker sub-containers. + + Must be called with ``_read_tracker_lock`` held. Eviction policy: + + * ``read_history`` (set): pop arbitrary entries on overflow. This + is fine because the set only feeds diagnostic summaries; losing + old entries just trims the summary's tail. + * ``dedup`` / ``read_timestamps`` (dict): pop oldest by insertion + order (Python 3.7+ dicts). Evicted entries lose their dedup + skip on a future re-read (the file gets re-sent once) and + external-edit mtime comparison (the write/patch falls back to + a non-mtime check). Both are graceful degradations, not bugs. + """ + rh = task_data.get("read_history") + if rh is not None and len(rh) > _READ_HISTORY_CAP: + excess = len(rh) - _READ_HISTORY_CAP + for _ in range(excess): + try: + rh.pop() + except KeyError: + break + + dedup = task_data.get("dedup") + if dedup is not None and len(dedup) > _DEDUP_CAP: + excess = len(dedup) - _DEDUP_CAP + for _ in range(excess): + try: + dedup.pop(next(iter(dedup))) + except (StopIteration, KeyError): + break + + ts = task_data.get("read_timestamps") + if ts is not None and len(ts) > _READ_TIMESTAMPS_CAP: + excess = len(ts) - _READ_TIMESTAMPS_CAP + for _ in range(excess): + try: + ts.pop(next(iter(ts))) + except (StopIteration, KeyError): + break + def _get_file_ops(task_id: str = "default") -> ShellFileOperations: """Get or create ShellFileOperations for a terminal environment. @@ -426,6 +478,10 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str = except OSError: pass # Can't stat — skip tracking for this entry + # Bound the per-task containers so a long CLI session doesn't + # accumulate megabytes of dict/set state. See _cap_read_tracker_data. + _cap_read_tracker_data(task_data) + if count >= 4: # Hard block: stop returning content to break the loop return json.dumps({ @@ -505,6 +561,7 @@ def _update_read_timestamp(filepath: str, task_id: str) -> None: task_data = _read_tracker.get(task_id) if task_data is not None: task_data.setdefault("read_timestamps", {})[resolved] = current_mtime + _cap_read_tracker_data(task_data) def _check_file_staleness(filepath: str, task_id: str) -> str | None: diff --git a/tools/process_registry.py b/tools/process_registry.py index 2dbcdd150..d2cc56639 100644 --- a/tools/process_registry.py +++ b/tools/process_registry.py @@ -970,12 +970,22 @@ class ProcessRegistry: ] for sid in expired: del self._finished[sid] + self._completion_consumed.discard(sid) # If still over limit, remove oldest finished total = len(self._running) + len(self._finished) if total >= MAX_PROCESSES and self._finished: oldest_id = min(self._finished, key=lambda sid: self._finished[sid].started_at) del self._finished[oldest_id] + self._completion_consumed.discard(oldest_id) + + # Drop any _completion_consumed entries whose sessions are no longer + # tracked at all — belt-and-suspenders against module-lifetime growth + # on process-registry lookup paths that don't reach the dict prunes. + tracked = self._running.keys() | self._finished.keys() + stale = self._completion_consumed - tracked + if stale: + self._completion_consumed -= stale # ----- Checkpoint (crash recovery) -----