diff --git a/gateway/run.py b/gateway/run.py index 9b97968665..ca07254268 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -24,11 +24,20 @@ import signal import tempfile import threading import time +from collections import OrderedDict from contextvars import copy_context from pathlib import Path from datetime import datetime from typing import Dict, Optional, Any, List +# --- Agent cache tuning --------------------------------------------------- +# Bounds the per-session AIAgent cache to prevent unbounded growth in +# long-lived gateways (each AIAgent holds LLM clients, tool schemas, +# memory providers, etc.). LRU order + idle TTL eviction are enforced +# from _enforce_agent_cache_cap() and _session_expiry_watcher() below. +_AGENT_CACHE_MAX_SIZE = 128 +_AGENT_CACHE_IDLE_TTL_SECS = 3600.0 # evict agents idle for >1h + # --------------------------------------------------------------------------- # SSL certificate auto-detection for NixOS and other non-standard systems. # Must run BEFORE any HTTP library (discord, aiohttp, etc.) is imported. @@ -622,8 +631,13 @@ class GatewayRunner: # system prompt (including memory) every turn — breaking prefix cache # and costing ~10x more on providers with prompt caching (Anthropic). # Key: session_key, Value: (AIAgent, config_signature_str) + # + # OrderedDict so _enforce_agent_cache_cap() can pop the least-recently- + # used entry (move_to_end() on cache hits, popitem(last=False) for + # eviction). Hard cap via _AGENT_CACHE_MAX_SIZE, idle TTL enforced + # from _session_expiry_watcher(). import threading as _threading - self._agent_cache: Dict[str, tuple] = {} + self._agent_cache: "OrderedDict[str, tuple]" = OrderedDict() self._agent_cache_lock = _threading.Lock() # Per-session model overrides from /model command. @@ -2102,6 +2116,11 @@ class GatewayRunner: _cached_agent = self._running_agents.get(key) if _cached_agent and _cached_agent is not _AGENT_PENDING_SENTINEL: self._cleanup_agent_resources(_cached_agent) + # Drop the cache entry so the AIAgent (and its LLM + # clients, tool schemas, memory provider refs) can + # be garbage-collected. Otherwise the cache grows + # unbounded across the gateway's lifetime. + self._evict_cached_agent(key) # Mark as flushed and persist to disk so the flag # survives gateway restarts. with self.session_store._lock: @@ -2145,6 +2164,20 @@ class GatewayRunner: logger.info( "Session expiry done: %d flushed", _flushed, ) + + # Sweep agents that have been idle beyond the TTL regardless + # of session reset policy. This catches sessions with very + # long / "never" reset windows, whose cached AIAgents would + # otherwise pin memory for the gateway's entire lifetime. + try: + _idle_evicted = self._sweep_idle_cached_agents() + if _idle_evicted: + logger.info( + "Agent cache idle sweep: evicted %d agent(s)", + _idle_evicted, + ) + except Exception as _e: + logger.debug("Idle agent sweep failed: %s", _e) except Exception as e: logger.debug("Session expiry watcher error: %s", e) # Sleep in small increments so we can stop quickly @@ -7887,6 +7920,153 @@ class GatewayRunner: with _lock: self._agent_cache.pop(session_key, None) + def _release_evicted_agent_soft(self, agent: Any) -> None: + """Soft cleanup for cache-evicted agents — preserves session tool state. + + Called from _enforce_agent_cache_cap and _sweep_idle_cached_agents. + Distinct from _cleanup_agent_resources (full teardown) because a + cache-evicted session may resume at any time — its terminal + sandbox, browser daemon, and tracked bg processes must outlive + the Python AIAgent instance so the next agent built for the + same task_id inherits them. + """ + if agent is None: + return + try: + if hasattr(agent, "release_clients"): + agent.release_clients() + else: + # Older agent instance (shouldn't happen in practice) — + # fall back to the legacy full-close path. + self._cleanup_agent_resources(agent) + except Exception: + pass + + def _enforce_agent_cache_cap(self) -> None: + """Evict oldest cached agents when cache exceeds _AGENT_CACHE_MAX_SIZE. + + Must be called with _agent_cache_lock held. Resource cleanup + (memory provider shutdown, tool resource close) is scheduled + on a daemon thread so the caller doesn't block on slow teardown + while holding the cache lock. + + Agents currently in _running_agents are SKIPPED — their clients, + terminal sandboxes, background processes, and child subagents + are all in active use by the running turn. Evicting them would + tear down those resources mid-turn and crash the request. If + every candidate in the LRU order is active, we simply leave the + cache over the cap; it will be re-checked on the next insert. + """ + _cache = getattr(self, "_agent_cache", None) + if _cache is None: + return + # OrderedDict.popitem(last=False) pops oldest; plain dict lacks the + # arg so skip enforcement if a test fixture swapped the cache type. + if not hasattr(_cache, "move_to_end"): + return + + # Snapshot of agent instances that are actively mid-turn. Use id() + # so the lookup is O(1) and doesn't depend on AIAgent.__eq__ (which + # MagicMock overrides in tests). + running_ids = { + id(a) + for a in getattr(self, "_running_agents", {}).values() + if a is not None and a is not _AGENT_PENDING_SENTINEL + } + + # Walk LRU → MRU and evict excess-LRU entries that aren't mid-turn. + # We only consider entries in the first (size - cap) LRU positions + # as eviction candidates. If one of those slots is held by an + # active agent, we SKIP it without compensating by evicting a + # newer entry — that would penalise a freshly-inserted session + # (which has no cache history to retain) while protecting an + # already-cached long-running one. The cache may therefore stay + # temporarily over cap; it will re-check on the next insert, + # after active turns have finished. + excess = max(0, len(_cache) - _AGENT_CACHE_MAX_SIZE) + evict_plan: List[tuple] = [] # [(key, agent), ...] + if excess > 0: + ordered_keys = list(_cache.keys()) + for key in ordered_keys[:excess]: + entry = _cache.get(key) + agent = entry[0] if isinstance(entry, tuple) and entry else None + if agent is not None and id(agent) in running_ids: + continue # active mid-turn; don't evict, don't substitute + evict_plan.append((key, agent)) + + for key, _ in evict_plan: + _cache.pop(key, None) + + remaining_over_cap = len(_cache) - _AGENT_CACHE_MAX_SIZE + if remaining_over_cap > 0: + logger.warning( + "Agent cache over cap (%d > %d); %d excess slot(s) held by " + "mid-turn agents — will re-check on next insert.", + len(_cache), _AGENT_CACHE_MAX_SIZE, remaining_over_cap, + ) + + for key, agent in evict_plan: + logger.info( + "Agent cache at cap; evicting LRU session=%s (cache_size=%d)", + key, len(_cache), + ) + if agent is not None: + threading.Thread( + target=self._release_evicted_agent_soft, + args=(agent,), + daemon=True, + name=f"agent-cache-evict-{key[:24]}", + ).start() + + def _sweep_idle_cached_agents(self) -> int: + """Evict cached agents whose AIAgent has been idle > _AGENT_CACHE_IDLE_TTL_SECS. + + Safe to call from the session expiry watcher without holding the + cache lock — acquires it internally. Returns the number of entries + evicted. Resource cleanup is scheduled on daemon threads. + + Agents currently in _running_agents are SKIPPED for the same reason + as _enforce_agent_cache_cap: tearing down an active turn's clients + mid-flight would crash the request. + """ + _cache = getattr(self, "_agent_cache", None) + _lock = getattr(self, "_agent_cache_lock", None) + if _cache is None or _lock is None: + return 0 + now = time.time() + to_evict: List[tuple] = [] + running_ids = { + id(a) + for a in getattr(self, "_running_agents", {}).values() + if a is not None and a is not _AGENT_PENDING_SENTINEL + } + with _lock: + for key, entry in list(_cache.items()): + agent = entry[0] if isinstance(entry, tuple) and entry else None + if agent is None: + continue + if id(agent) in running_ids: + continue # mid-turn — don't tear it down + last_activity = getattr(agent, "_last_activity_ts", None) + if last_activity is None: + continue + if (now - last_activity) > _AGENT_CACHE_IDLE_TTL_SECS: + to_evict.append((key, agent)) + for key, _ in to_evict: + _cache.pop(key, None) + for key, agent in to_evict: + logger.info( + "Agent cache idle-TTL evict: session=%s (idle=%.0fs)", + key, now - getattr(agent, "_last_activity_ts", now), + ) + threading.Thread( + target=self._release_evicted_agent_soft, + args=(agent,), + daemon=True, + name=f"agent-cache-idle-{key[:24]}", + ).start() + return len(to_evict) + # ------------------------------------------------------------------ # Proxy mode: forward messages to a remote Hermes API server # ------------------------------------------------------------------ @@ -8654,6 +8834,13 @@ class GatewayRunner: cached = _cache.get(session_key) if cached and cached[1] == _sig: agent = cached[0] + # Refresh LRU order so the cap enforcement evicts + # truly-oldest entries, not the one we just used. + if hasattr(_cache, "move_to_end"): + try: + _cache.move_to_end(session_key) + except KeyError: + pass # Reset activity timestamp so the inactivity timeout # handler doesn't see stale idle time from the previous # turn and immediately kill this agent. (#9051) @@ -8692,6 +8879,7 @@ class GatewayRunner: if _cache_lock and _cache is not None: with _cache_lock: _cache[session_key] = (agent, _sig) + self._enforce_agent_cache_cap() logger.debug("Created new agent for session %s (sig=%s)", session_key, _sig) # Per-message state — callbacks and reasoning config change every diff --git a/run_agent.py b/run_agent.py index 325df9beb1..8db57a703e 100644 --- a/run_agent.py +++ b/run_agent.py @@ -3242,6 +3242,53 @@ class AIAgent: except Exception: pass + def release_clients(self) -> None: + """Release LLM client resources WITHOUT tearing down session tool state. + + Used by the gateway when evicting this agent from _agent_cache for + memory-management reasons (LRU cap or idle TTL) — the session may + resume at any time with a freshly-built AIAgent that reuses the + same task_id / session_id, so we must NOT kill: + - process_registry entries for task_id (user's bg shells) + - terminal sandbox for task_id (cwd, env, shell state) + - browser daemon for task_id (open tabs, cookies) + - memory provider (has its own lifecycle; keeps running) + + We DO close: + - OpenAI/httpx client pool (big chunk of held memory + sockets; + the rebuilt agent gets a fresh client anyway) + - Active child subagents (per-turn artefacts; safe to drop) + + Safe to call multiple times. Distinct from close() — which is the + hard teardown for actual session boundaries (/new, /reset, session + expiry). + """ + # Close active child agents (per-turn; no cross-turn persistence). + try: + with self._active_children_lock: + children = list(self._active_children) + self._active_children.clear() + for child in children: + try: + child.release_clients() + except Exception: + # Fall back to full close on children; they're per-turn. + try: + child.close() + except Exception: + pass + except Exception: + pass + + # Close the OpenAI/httpx client to release sockets immediately. + try: + client = getattr(self, "client", None) + if client is not None: + self._close_openai_client(client, reason="cache_evict", shared=True) + self.client = None + except Exception: + pass + def close(self) -> None: """Release all resources held by this agent instance. diff --git a/tests/gateway/test_agent_cache.py b/tests/gateway/test_agent_cache.py index 761eb78d78..ae6c73ef7f 100644 --- a/tests/gateway/test_agent_cache.py +++ b/tests/gateway/test_agent_cache.py @@ -258,3 +258,785 @@ class TestAgentCacheLifecycle: cb3 = lambda *a: None agent.tool_progress_callback = cb3 assert agent.tool_progress_callback is cb3 + + +class TestAgentCacheBoundedGrowth: + """LRU cap and idle-TTL eviction prevent unbounded cache growth.""" + + def _bounded_runner(self): + """Runner with an OrderedDict cache (matches real gateway init).""" + from collections import OrderedDict + from gateway.run import GatewayRunner + + runner = GatewayRunner.__new__(GatewayRunner) + runner._agent_cache = OrderedDict() + runner._agent_cache_lock = threading.Lock() + return runner + + def _fake_agent(self, last_activity: float | None = None): + """Lightweight stand-in; real AIAgent is heavy to construct.""" + m = MagicMock() + if last_activity is not None: + m._last_activity_ts = last_activity + else: + import time as _t + m._last_activity_ts = _t.time() + return m + + def test_cap_evicts_lru_when_exceeded(self, monkeypatch): + """Inserting past _AGENT_CACHE_MAX_SIZE pops the oldest entry.""" + from gateway import run as gw_run + + monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 3) + runner = self._bounded_runner() + runner._cleanup_agent_resources = MagicMock() + + for i in range(3): + runner._agent_cache[f"s{i}"] = (self._fake_agent(), f"sig{i}") + + # Insert a 4th — oldest (s0) must be evicted. + with runner._agent_cache_lock: + runner._agent_cache["s3"] = (self._fake_agent(), "sig3") + runner._enforce_agent_cache_cap() + + assert "s0" not in runner._agent_cache + assert "s3" in runner._agent_cache + assert len(runner._agent_cache) == 3 + + def test_cap_respects_move_to_end(self, monkeypatch): + """Entries refreshed via move_to_end are NOT evicted as 'oldest'.""" + from gateway import run as gw_run + + monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 3) + runner = self._bounded_runner() + runner._cleanup_agent_resources = MagicMock() + + for i in range(3): + runner._agent_cache[f"s{i}"] = (self._fake_agent(), f"sig{i}") + + # Touch s0 — it is now MRU, so s1 becomes LRU. + runner._agent_cache.move_to_end("s0") + + with runner._agent_cache_lock: + runner._agent_cache["s3"] = (self._fake_agent(), "sig3") + runner._enforce_agent_cache_cap() + + assert "s0" in runner._agent_cache # rescued by move_to_end + assert "s1" not in runner._agent_cache # now oldest → evicted + assert "s3" in runner._agent_cache + + def test_cap_triggers_cleanup_thread(self, monkeypatch): + """Evicted agent has release_clients() called for it (soft cleanup). + + Uses the soft path (_release_evicted_agent_soft), NOT the hard + _cleanup_agent_resources — cache eviction must not tear down + per-task state (terminal/browser/bg procs). + """ + from gateway import run as gw_run + + monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 1) + runner = self._bounded_runner() + + release_calls: list = [] + cleanup_calls: list = [] + # Intercept both paths; only release_clients path should fire. + def _soft(agent): + release_calls.append(agent) + runner._release_evicted_agent_soft = _soft + runner._cleanup_agent_resources = lambda a: cleanup_calls.append(a) + + old_agent = self._fake_agent() + new_agent = self._fake_agent() + with runner._agent_cache_lock: + runner._agent_cache["old"] = (old_agent, "sig_old") + runner._agent_cache["new"] = (new_agent, "sig_new") + runner._enforce_agent_cache_cap() + + # Cleanup is dispatched to a daemon thread; join briefly to observe. + import time as _t + deadline = _t.time() + 2.0 + while _t.time() < deadline and not release_calls: + _t.sleep(0.02) + assert old_agent in release_calls + assert new_agent not in release_calls + # Hard-cleanup path must NOT have fired — that's for session expiry only. + assert cleanup_calls == [] + + def test_idle_ttl_sweep_evicts_stale_agents(self, monkeypatch): + """_sweep_idle_cached_agents removes agents idle past the TTL.""" + from gateway import run as gw_run + + monkeypatch.setattr(gw_run, "_AGENT_CACHE_IDLE_TTL_SECS", 0.05) + runner = self._bounded_runner() + runner._cleanup_agent_resources = MagicMock() + + import time as _t + fresh = self._fake_agent(last_activity=_t.time()) + stale = self._fake_agent(last_activity=_t.time() - 10.0) + runner._agent_cache["fresh"] = (fresh, "s1") + runner._agent_cache["stale"] = (stale, "s2") + + evicted = runner._sweep_idle_cached_agents() + assert evicted == 1 + assert "stale" not in runner._agent_cache + assert "fresh" in runner._agent_cache + + def test_idle_sweep_skips_agents_without_activity_ts(self, monkeypatch): + """Agents missing _last_activity_ts are left alone (defensive).""" + from gateway import run as gw_run + + monkeypatch.setattr(gw_run, "_AGENT_CACHE_IDLE_TTL_SECS", 0.01) + runner = self._bounded_runner() + runner._cleanup_agent_resources = MagicMock() + + no_ts = MagicMock(spec=[]) # no _last_activity_ts attribute + runner._agent_cache["s"] = (no_ts, "sig") + + assert runner._sweep_idle_cached_agents() == 0 + assert "s" in runner._agent_cache + + def test_plain_dict_cache_is_tolerated(self): + """Test fixtures using plain {} don't crash _enforce_agent_cache_cap.""" + from gateway.run import GatewayRunner + + runner = GatewayRunner.__new__(GatewayRunner) + runner._agent_cache = {} # plain dict, not OrderedDict + runner._agent_cache_lock = threading.Lock() + runner._cleanup_agent_resources = MagicMock() + + # Should be a no-op rather than raising. + with runner._agent_cache_lock: + for i in range(200): + runner._agent_cache[f"s{i}"] = (MagicMock(), f"sig{i}") + runner._enforce_agent_cache_cap() # no crash, no eviction + + assert len(runner._agent_cache) == 200 + + def test_main_lookup_updates_lru_order(self, monkeypatch): + """Cache hit via the main-lookup path refreshes LRU position.""" + runner = self._bounded_runner() + + a0 = self._fake_agent() + a1 = self._fake_agent() + a2 = self._fake_agent() + runner._agent_cache["s0"] = (a0, "sig0") + runner._agent_cache["s1"] = (a1, "sig1") + runner._agent_cache["s2"] = (a2, "sig2") + + # Simulate what _process_message_background does on a cache hit + # (minus the agent-state reset which isn't relevant here). + with runner._agent_cache_lock: + cached = runner._agent_cache.get("s0") + if cached and hasattr(runner._agent_cache, "move_to_end"): + runner._agent_cache.move_to_end("s0") + + # After the hit, insertion order should be s1, s2, s0. + assert list(runner._agent_cache.keys()) == ["s1", "s2", "s0"] + + +class TestAgentCacheActiveSafety: + """Safety: eviction must not tear down agents currently mid-turn. + + AIAgent.close() kills process_registry entries for the task, cleans + the terminal sandbox, closes the OpenAI client, and cascades + .close() into active child subagents. Calling it while the agent + is still processing would crash the in-flight request. These tests + pin that eviction skips any agent present in _running_agents. + """ + + def _runner(self): + from collections import OrderedDict + from gateway.run import GatewayRunner + + runner = GatewayRunner.__new__(GatewayRunner) + runner._agent_cache = OrderedDict() + runner._agent_cache_lock = threading.Lock() + runner._running_agents = {} + return runner + + def _fake_agent(self, idle_seconds: float = 0.0): + import time as _t + m = MagicMock() + m._last_activity_ts = _t.time() - idle_seconds + return m + + def test_cap_skips_active_lru_entry(self, monkeypatch): + """Active LRU entry is skipped; cache stays over cap rather than + compensating by evicting a newer entry. + + Rationale: evicting a more-recent entry just because the oldest + slot is temporarily locked would punish the most recently- + inserted session (which has no cache to preserve) to protect + one that happens to be mid-turn. Better to let the cache stay + transiently over cap and re-check on the next insert. + """ + from gateway import run as gw_run + + monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 2) + runner = self._runner() + runner._cleanup_agent_resources = MagicMock() + + active = self._fake_agent() + idle_a = self._fake_agent() + idle_b = self._fake_agent() + + # Insertion order: active (oldest), idle_a, idle_b. + runner._agent_cache["session-active"] = (active, "sig") + runner._agent_cache["session-idle-a"] = (idle_a, "sig") + runner._agent_cache["session-idle-b"] = (idle_b, "sig") + + # Mark `active` as mid-turn — it's LRU, but protected. + runner._running_agents["session-active"] = active + + with runner._agent_cache_lock: + runner._enforce_agent_cache_cap() + + # All three remain; no eviction ran, no cleanup dispatched. + assert "session-active" in runner._agent_cache + assert "session-idle-a" in runner._agent_cache + assert "session-idle-b" in runner._agent_cache + assert runner._cleanup_agent_resources.call_count == 0 + + def test_cap_evicts_when_multiple_excess_and_some_inactive(self, monkeypatch): + """Mixed active/idle in the LRU excess window: only the idle ones go. + + With CAP=2 and 4 entries, excess=2 (the two oldest). If the + oldest is active and the next is idle, we evict exactly one. + Cache ends at CAP+1, which is still better than unbounded. + """ + from gateway import run as gw_run + + monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 2) + runner = self._runner() + runner._cleanup_agent_resources = MagicMock() + + oldest_active = self._fake_agent() + idle_second = self._fake_agent() + idle_third = self._fake_agent() + idle_fourth = self._fake_agent() + + runner._agent_cache["s1"] = (oldest_active, "sig") + runner._agent_cache["s2"] = (idle_second, "sig") # in excess window, idle + runner._agent_cache["s3"] = (idle_third, "sig") + runner._agent_cache["s4"] = (idle_fourth, "sig") + + runner._running_agents["s1"] = oldest_active # oldest is mid-turn + + with runner._agent_cache_lock: + runner._enforce_agent_cache_cap() + + # s1 protected (active), s2 evicted (idle + in excess window), + # s3 and s4 untouched (outside excess window). + assert "s1" in runner._agent_cache + assert "s2" not in runner._agent_cache + assert "s3" in runner._agent_cache + assert "s4" in runner._agent_cache + + def test_cap_leaves_cache_over_limit_if_all_active(self, monkeypatch, caplog): + """If every over-cap entry is mid-turn, the cache stays over cap. + + Better to temporarily exceed the cap than to crash an in-flight + turn by tearing down its clients. + """ + from gateway import run as gw_run + import logging as _logging + + monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 1) + runner = self._runner() + runner._cleanup_agent_resources = MagicMock() + + a1 = self._fake_agent() + a2 = self._fake_agent() + a3 = self._fake_agent() + runner._agent_cache["s1"] = (a1, "sig") + runner._agent_cache["s2"] = (a2, "sig") + runner._agent_cache["s3"] = (a3, "sig") + + # All three are mid-turn. + runner._running_agents["s1"] = a1 + runner._running_agents["s2"] = a2 + runner._running_agents["s3"] = a3 + + with caplog.at_level(_logging.WARNING, logger="gateway.run"): + with runner._agent_cache_lock: + runner._enforce_agent_cache_cap() + + # Cache unchanged because eviction had to skip every candidate. + assert len(runner._agent_cache) == 3 + # _cleanup_agent_resources must NOT have been scheduled. + assert runner._cleanup_agent_resources.call_count == 0 + # And we logged a warning so operators can see the condition. + assert any("mid-turn" in r.message for r in caplog.records) + + def test_cap_pending_sentinel_does_not_block_eviction(self, monkeypatch): + """_AGENT_PENDING_SENTINEL in _running_agents is treated as 'not active'. + + The sentinel is set while an agent is being CONSTRUCTED, before the + real AIAgent instance exists. Cached agents from other sessions + can still be evicted safely. + """ + from gateway import run as gw_run + from gateway.run import _AGENT_PENDING_SENTINEL + + monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 1) + runner = self._runner() + runner._cleanup_agent_resources = MagicMock() + + a1 = self._fake_agent() + a2 = self._fake_agent() + runner._agent_cache["s1"] = (a1, "sig") + runner._agent_cache["s2"] = (a2, "sig") + # Another session is mid-creation — sentinel, no real agent yet. + runner._running_agents["s3-being-created"] = _AGENT_PENDING_SENTINEL + + with runner._agent_cache_lock: + runner._enforce_agent_cache_cap() + + assert "s1" not in runner._agent_cache # evicted normally + assert "s2" in runner._agent_cache + + def test_idle_sweep_skips_active_agent(self, monkeypatch): + """Idle-TTL sweep must not tear down an active agent even if 'stale'.""" + from gateway import run as gw_run + + monkeypatch.setattr(gw_run, "_AGENT_CACHE_IDLE_TTL_SECS", 0.01) + runner = self._runner() + runner._cleanup_agent_resources = MagicMock() + + old_but_active = self._fake_agent(idle_seconds=10.0) + runner._agent_cache["s1"] = (old_but_active, "sig") + runner._running_agents["s1"] = old_but_active + + evicted = runner._sweep_idle_cached_agents() + + assert evicted == 0 + assert "s1" in runner._agent_cache + assert runner._cleanup_agent_resources.call_count == 0 + + def test_eviction_does_not_close_active_agent_client(self, monkeypatch): + """Live test: evicting an active agent does NOT null its .client. + + This reproduces the original concern — if eviction fired while an + agent was mid-turn, `agent.close()` would set `self.client = None` + and the next API call inside the loop would crash. With the + active-agent skip, the client stays intact. + """ + from gateway import run as gw_run + + monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 1) + runner = self._runner() + + # Build a proper fake agent whose close() matches AIAgent's contract. + active = MagicMock() + active._last_activity_ts = __import__("time").time() + active.client = MagicMock() # simulate an OpenAI client + def _real_close(): + active.client = None # mirrors run_agent.py:3299 + active.close = _real_close + active.shutdown_memory_provider = MagicMock() + + idle = self._fake_agent() + + runner._agent_cache["active-session"] = (active, "sig") + runner._agent_cache["idle-session"] = (idle, "sig") + runner._running_agents["active-session"] = active + + # Real cleanup function, not mocked — we want to see whether close() + # runs on the active agent. (It shouldn't.) + with runner._agent_cache_lock: + runner._enforce_agent_cache_cap() + + # Let any eviction cleanup threads drain. + import time as _t + _t.sleep(0.2) + + # The ACTIVE agent's client must still be usable. + assert active.client is not None, ( + "Active agent's client was closed by eviction — " + "running turn would crash on its next API call." + ) + + +class TestAgentCacheSpilloverLive: + """Live E2E: fill cache with real AIAgent instances and stress it.""" + + def _runner(self): + from collections import OrderedDict + from gateway.run import GatewayRunner + + runner = GatewayRunner.__new__(GatewayRunner) + runner._agent_cache = OrderedDict() + runner._agent_cache_lock = threading.Lock() + runner._running_agents = {} + return runner + + def _real_agent(self): + """A genuine AIAgent; no API calls are made during these tests.""" + from run_agent import AIAgent + return AIAgent( + model="anthropic/claude-sonnet-4", api_key="test", + base_url="https://openrouter.ai/api/v1", provider="openrouter", + max_iterations=5, quiet_mode=True, + skip_context_files=True, skip_memory=True, + platform="telegram", + ) + + def test_fill_to_cap_then_spillover(self, monkeypatch): + """Fill to cap with real agents, insert one more, oldest evicted.""" + from gateway import run as gw_run + + CAP = 8 + monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", CAP) + runner = self._runner() + + agents = [self._real_agent() for _ in range(CAP)] + for i, a in enumerate(agents): + with runner._agent_cache_lock: + runner._agent_cache[f"s{i}"] = (a, "sig") + runner._enforce_agent_cache_cap() + assert len(runner._agent_cache) == CAP + + # Spillover insertion. + newcomer = self._real_agent() + with runner._agent_cache_lock: + runner._agent_cache["new"] = (newcomer, "sig") + runner._enforce_agent_cache_cap() + + # Oldest (s0) evicted, cap still CAP. + assert "s0" not in runner._agent_cache + assert "new" in runner._agent_cache + assert len(runner._agent_cache) == CAP + + # Clean up so pytest doesn't leak resources. + for a in agents + [newcomer]: + try: + a.close() + except Exception: + pass + + def test_spillover_all_active_keeps_cache_over_cap(self, monkeypatch, caplog): + """Every slot active: cache goes over cap, no one gets torn down.""" + from gateway import run as gw_run + import logging as _logging + + CAP = 4 + monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", CAP) + runner = self._runner() + + agents = [self._real_agent() for _ in range(CAP)] + for i, a in enumerate(agents): + runner._agent_cache[f"s{i}"] = (a, "sig") + runner._running_agents[f"s{i}"] = a # every session mid-turn + + newcomer = self._real_agent() + with caplog.at_level(_logging.WARNING, logger="gateway.run"): + with runner._agent_cache_lock: + runner._agent_cache["new"] = (newcomer, "sig") + runner._enforce_agent_cache_cap() + + assert len(runner._agent_cache) == CAP + 1 # temporarily over cap + # All existing agents still usable. + for i, a in enumerate(agents): + assert a.client is not None, f"s{i} got closed while active!" + # And we warned operators. + assert any("mid-turn" in r.message for r in caplog.records) + + for a in agents + [newcomer]: + try: + a.close() + except Exception: + pass + + def test_concurrent_inserts_settle_at_cap(self, monkeypatch): + """Many threads inserting in parallel end with len(cache) == CAP.""" + from gateway import run as gw_run + + CAP = 16 + monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", CAP) + runner = self._runner() + + N_THREADS = 8 + PER_THREAD = 20 # 8 * 20 = 160 inserts into a 16-slot cache + + def worker(tid: int): + for j in range(PER_THREAD): + a = self._real_agent() + key = f"t{tid}-s{j}" + with runner._agent_cache_lock: + runner._agent_cache[key] = (a, "sig") + runner._enforce_agent_cache_cap() + + threads = [ + threading.Thread(target=worker, args=(t,), daemon=True) + for t in range(N_THREADS) + ] + for t in threads: + t.start() + for t in threads: + t.join(timeout=30) + assert not t.is_alive(), "Worker thread hung — possible deadlock?" + + # Let daemon cleanup threads settle. + import time as _t + _t.sleep(0.5) + + assert len(runner._agent_cache) == CAP, ( + f"Expected exactly {CAP} entries after concurrent inserts, " + f"got {len(runner._agent_cache)}." + ) + + def test_evicted_session_next_turn_gets_fresh_agent(self, monkeypatch): + """After eviction, the same session_key can insert a fresh agent. + + Simulates the real spillover flow: evicted session sends another + message, which builds a new AIAgent and re-enters the cache. + """ + from gateway import run as gw_run + + CAP = 2 + monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", CAP) + runner = self._runner() + + a0 = self._real_agent() + a1 = self._real_agent() + runner._agent_cache["sA"] = (a0, "sig") + runner._agent_cache["sB"] = (a1, "sig") + + # 3rd session forces sA (oldest) out. + a2 = self._real_agent() + with runner._agent_cache_lock: + runner._agent_cache["sC"] = (a2, "sig") + runner._enforce_agent_cache_cap() + assert "sA" not in runner._agent_cache + + # Let the eviction cleanup thread run. + import time as _t + _t.sleep(0.3) + + # Now sA's user sends another message → a fresh agent goes in. + a0_new = self._real_agent() + with runner._agent_cache_lock: + runner._agent_cache["sA"] = (a0_new, "sig") + runner._enforce_agent_cache_cap() + + assert "sA" in runner._agent_cache + assert runner._agent_cache["sA"][0] is a0_new # the new one, not stale + # Fresh agent is usable. + assert a0_new.client is not None + + for a in (a0, a1, a2, a0_new): + try: + a.close() + except Exception: + pass + + +class TestAgentCacheIdleResume: + """End-to-end: idle-TTL-evicted session resumes cleanly with task state. + + Real-world scenario: user leaves a Telegram session open for 2+ hours. + Idle-TTL evicts their cached agent. They come back and send a message. + The new agent built for the same session_id must inherit: + - Conversation history (from SessionStore — outside cache concern) + - Terminal sandbox (same task_id → same _active_environments entry) + - Browser daemon (same task_id → same browser session) + - Background processes (same task_id → same process_registry entries) + The ONLY thing that should reset is the LLM client pool (rebuilt fresh). + """ + + def _runner(self): + from collections import OrderedDict + from gateway.run import GatewayRunner + + runner = GatewayRunner.__new__(GatewayRunner) + runner._agent_cache = OrderedDict() + runner._agent_cache_lock = threading.Lock() + runner._running_agents = {} + return runner + + def test_release_clients_does_not_touch_process_registry(self, monkeypatch): + """release_clients must not call process_registry.kill_all for task_id.""" + from run_agent import AIAgent + + agent = AIAgent( + model="anthropic/claude-sonnet-4", api_key="test", + base_url="https://openrouter.ai/api/v1", provider="openrouter", + max_iterations=5, quiet_mode=True, + skip_context_files=True, skip_memory=True, + session_id="idle-resume-test-session", + ) + + # Spy on process_registry.kill_all — it MUST NOT be called. + from tools import process_registry as _pr + kill_all_calls: list = [] + original_kill_all = _pr.process_registry.kill_all + _pr.process_registry.kill_all = lambda **kw: kill_all_calls.append(kw) + try: + agent.release_clients() + finally: + _pr.process_registry.kill_all = original_kill_all + try: + agent.close() + except Exception: + pass + + assert kill_all_calls == [], ( + f"release_clients() called process_registry.kill_all — would " + f"kill user's bg processes on cache eviction. Calls: {kill_all_calls}" + ) + + def test_release_clients_does_not_touch_terminal_or_browser(self, monkeypatch): + """release_clients must not call cleanup_vm or cleanup_browser.""" + from run_agent import AIAgent + from tools import terminal_tool as _tt + from tools import browser_tool as _bt + + agent = AIAgent( + model="anthropic/claude-sonnet-4", api_key="test", + base_url="https://openrouter.ai/api/v1", provider="openrouter", + max_iterations=5, quiet_mode=True, + skip_context_files=True, skip_memory=True, + session_id="idle-resume-test-2", + ) + + vm_calls: list = [] + browser_calls: list = [] + original_vm = _tt.cleanup_vm + original_browser = _bt.cleanup_browser + _tt.cleanup_vm = lambda tid: vm_calls.append(tid) + _bt.cleanup_browser = lambda tid: browser_calls.append(tid) + try: + agent.release_clients() + finally: + _tt.cleanup_vm = original_vm + _bt.cleanup_browser = original_browser + try: + agent.close() + except Exception: + pass + + assert vm_calls == [], ( + f"release_clients() tore down terminal sandbox — user's cwd, " + f"env, and bg shells would be gone on resume. Calls: {vm_calls}" + ) + assert browser_calls == [], ( + f"release_clients() tore down browser session — user's open " + f"tabs and cookies gone on resume. Calls: {browser_calls}" + ) + + def test_release_clients_closes_llm_client(self): + """release_clients IS expected to close the OpenAI/httpx client.""" + from run_agent import AIAgent + + agent = AIAgent( + model="anthropic/claude-sonnet-4", api_key="test", + base_url="https://openrouter.ai/api/v1", provider="openrouter", + max_iterations=5, quiet_mode=True, + skip_context_files=True, skip_memory=True, + ) + # Clients are lazy-built; force one to exist so we can verify close. + assert agent.client is not None # __init__ builds it + + agent.release_clients() + + # Post-release: client reference is dropped (memory freed). + assert agent.client is None + + def test_close_vs_release_full_teardown_difference(self, monkeypatch): + """close() tears down task state; release_clients() does not. + + This pins the semantic contract: session-expiry path uses close() + (full teardown — session is done), cache-eviction path uses + release_clients() (soft — session may resume). + """ + from run_agent import AIAgent + from tools import terminal_tool as _tt + + # Agent A: evicted from cache (soft) — terminal survives. + # Agent B: session expired (hard) — terminal torn down. + agent_a = AIAgent( + model="anthropic/claude-sonnet-4", api_key="test", + base_url="https://openrouter.ai/api/v1", provider="openrouter", + max_iterations=5, quiet_mode=True, + skip_context_files=True, skip_memory=True, + session_id="soft-session", + ) + agent_b = AIAgent( + model="anthropic/claude-sonnet-4", api_key="test", + base_url="https://openrouter.ai/api/v1", provider="openrouter", + max_iterations=5, quiet_mode=True, + skip_context_files=True, skip_memory=True, + session_id="hard-session", + ) + + vm_calls: list = [] + original_vm = _tt.cleanup_vm + _tt.cleanup_vm = lambda tid: vm_calls.append(tid) + try: + agent_a.release_clients() # cache eviction + agent_b.close() # session expiry + finally: + _tt.cleanup_vm = original_vm + try: + agent_a.close() + except Exception: + pass + + # Only agent_b's task_id should appear in cleanup calls. + assert "hard-session" in vm_calls + assert "soft-session" not in vm_calls + + def test_idle_evicted_session_rebuild_inherits_task_id(self, monkeypatch): + """After idle-TTL eviction, a fresh agent with the same session_id + gets the same task_id — so tool state (terminal/browser/bg procs) + that persisted across eviction is reachable via the new agent. + """ + from gateway import run as gw_run + from run_agent import AIAgent + + monkeypatch.setattr(gw_run, "_AGENT_CACHE_IDLE_TTL_SECS", 0.01) + runner = self._runner() + + # Build an agent representing a stale (idle) session. + SESSION_ID = "long-lived-user-session" + old = AIAgent( + model="anthropic/claude-sonnet-4", api_key="test", + base_url="https://openrouter.ai/api/v1", provider="openrouter", + max_iterations=5, quiet_mode=True, + skip_context_files=True, skip_memory=True, + session_id=SESSION_ID, + ) + old._last_activity_ts = 0.0 # force idle + runner._agent_cache["sKey"] = (old, "sig") + + # Simulate the idle-TTL sweep firing. + runner._sweep_idle_cached_agents() + assert "sKey" not in runner._agent_cache + + # Wait for the daemon thread doing release_clients() to finish. + import time as _t + _t.sleep(0.3) + + # Old agent's client is gone (soft cleanup fired). + assert old.client is None + + # User comes back — new agent built for the SAME session_id. + new_agent = AIAgent( + model="anthropic/claude-sonnet-4", api_key="test", + base_url="https://openrouter.ai/api/v1", provider="openrouter", + max_iterations=5, quiet_mode=True, + skip_context_files=True, skip_memory=True, + session_id=SESSION_ID, + ) + + # Same session_id means same task_id routed to tools. The new + # agent inherits any per-task state (terminal sandbox etc.) that + # was preserved across eviction. + assert new_agent.session_id == old.session_id == SESSION_ID + # And it has a fresh working client. + assert new_agent.client is not None + + try: + new_agent.close() + except Exception: + pass