fix(gateway): bound _agent_cache with LRU cap + idle TTL eviction (#11565)

* fix(gateway): bound _agent_cache with LRU cap + idle TTL eviction

The per-session AIAgent cache was unbounded. Each cached AIAgent holds
LLM clients, tool schemas, memory providers, and a conversation buffer.
In a long-lived gateway serving many chats/threads, cached agents
accumulated indefinitely — entries were only evicted on /new, /model,
or session reset.

Changes:
- Cache is now an OrderedDict so we can pop least-recently-used entries.
- _enforce_agent_cache_cap() pops entries beyond _AGENT_CACHE_MAX_SIZE=64
  when a new agent is inserted. LRU order is refreshed via move_to_end()
  on cache hits.
- _sweep_idle_cached_agents() evicts entries whose AIAgent has been idle
  longer than _AGENT_CACHE_IDLE_TTL_SECS=3600s. Runs from the existing
  _session_expiry_watcher so no new background task is created.
- The expiry watcher now also pops the cache entry after calling
  _cleanup_agent_resources on a flushed session — previously the agent
  was shut down but its reference stayed in the cache dict.
- Evicted agents have _cleanup_agent_resources() called on a daemon
  thread so the cache lock isn't held during slow teardown.

Both tuning constants live at module scope so tests can monkeypatch
them without touching class state.

Tests: 7 new cases in test_agent_cache.py covering LRU eviction,
move_to_end refresh, cleanup thread dispatch, idle TTL sweep,
defensive handling of agents without _last_activity_ts, and plain-dict
test fixture tolerance.

* tweak: bump _AGENT_CACHE_MAX_SIZE 64 -> 128

* fix(gateway): never evict mid-turn agents; live spillover tests

The prior commit could tear down an active agent if its session_key
happened to be LRU when the cap was exceeded.  AIAgent.close() kills
process_registry entries for the task, tears down the terminal
sandbox, closes the OpenAI client (sets self.client = None), and
cascades .close() into any active child subagents — all fatal if
the agent is still processing a turn.

Changes:
- _enforce_agent_cache_cap and _sweep_idle_cached_agents now look at
  GatewayRunner._running_agents and skip any entry whose AIAgent
  instance is present (identity via id(), so MagicMock doesn't
  confuse lookup in tests).  _AGENT_PENDING_SENTINEL is treated
  as 'not active' since no real agent exists yet.
- Eviction only considers the LRU-excess window (first size-cap
  entries).  If an excess slot is held by a mid-turn agent, we skip
  it WITHOUT compensating by evicting a newer entry.  A freshly
  inserted session (zero cache history) shouldn't be punished to
  protect a long-lived one that happens to be busy.
- Cache may therefore stay transiently over cap when load spikes;
  a WARNING is logged so operators can see it, and the next insert
  re-runs the check after some turns have finished.

New tests (TestAgentCacheActiveSafety + TestAgentCacheSpilloverLive):
- Active LRU entry is skipped; no newer entry compensated
- Mixed active/idle excess window: only idle slots go
- All-active cache: no eviction, WARNING logged, all clients intact
- _AGENT_PENDING_SENTINEL doesn't block other evictions
- Idle-TTL sweep skips active agents
- End-to-end: active agent's .client survives eviction attempt
- Live fill-to-cap with real AIAgents, then spillover
- Live: CAP=4 all active + 1 newcomer — cache grows to 5, no teardown
- Live: 8 threads racing 160 inserts into CAP=16 — settles at 16
- Live: evicted session's next turn gets a fresh agent that works

30 tests pass (13 pre-existing + 17 new).  Related gateway suites
(model switch, session reset, proxy, etc.) all green.

* fix(gateway): cache eviction preserves per-task state for session resume

The prior commits called AIAgent.close() on cache-evicted agents, which
tears down process_registry entries, terminal sandbox, and browser
daemon for that task_id — permanently. Fine for session-expiry (session
ended), wrong for cache eviction (session may resume).

Real-world scenario: a user leaves a Telegram session open for 2+ hours,
idle TTL evicts the cached AIAgent, user returns and sends a message.
Conversation history is preserved via SessionStore, but their terminal
sandbox (cwd, env vars, bg shells) and browser state were destroyed.

Fix: split the two cleanup modes.

  close()               Full teardown — session ended. Kills bg procs,
                        tears down terminal sandbox + browser daemon,
                        closes LLM client. Used by session-expiry,
                        /new, /reset (unchanged).

  release_clients()     Soft cleanup — session may resume. Closes
                        LLM client only. Leaves process_registry,
                        terminal sandbox, browser daemon intact
                        for the resuming agent to inherit via
                        shared task_id.

Gateway cache eviction (_enforce_agent_cache_cap, _sweep_idle_cached_agents)
now dispatches _release_evicted_agent_soft on the daemon thread instead
of _cleanup_agent_resources. All session-expiry call sites of
_cleanup_agent_resources are unchanged.

Tests (TestAgentCacheIdleResume, 5 new cases):
- release_clients does NOT call process_registry.kill_all
- release_clients does NOT call cleanup_vm / cleanup_browser
- release_clients DOES close the LLM client (agent.client is None after)
- close() vs release_clients() — semantic contract pinned
- Idle-evicted session's rebuild with same session_id gets same task_id

Updated test_cap_triggers_cleanup_thread to assert the soft path fires
and the hard path does NOT.

35 tests pass in test_agent_cache.py; 67 related tests green.
This commit is contained in:
Teknium 2026-04-17 06:36:34 -07:00 committed by GitHub
parent fc04f83062
commit 8d7b7feb0d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 1018 additions and 1 deletions

View file

@ -24,11 +24,20 @@ import signal
import tempfile
import threading
import time
from collections import OrderedDict
from contextvars import copy_context
from pathlib import Path
from datetime import datetime
from typing import Dict, Optional, Any, List
# --- Agent cache tuning ---------------------------------------------------
# Bounds the per-session AIAgent cache to prevent unbounded growth in
# long-lived gateways (each AIAgent holds LLM clients, tool schemas,
# memory providers, etc.). LRU order + idle TTL eviction are enforced
# from _enforce_agent_cache_cap() and _session_expiry_watcher() below.
_AGENT_CACHE_MAX_SIZE = 128
_AGENT_CACHE_IDLE_TTL_SECS = 3600.0 # evict agents idle for >1h
# ---------------------------------------------------------------------------
# SSL certificate auto-detection for NixOS and other non-standard systems.
# Must run BEFORE any HTTP library (discord, aiohttp, etc.) is imported.
@ -622,8 +631,13 @@ class GatewayRunner:
# system prompt (including memory) every turn — breaking prefix cache
# and costing ~10x more on providers with prompt caching (Anthropic).
# Key: session_key, Value: (AIAgent, config_signature_str)
#
# OrderedDict so _enforce_agent_cache_cap() can pop the least-recently-
# used entry (move_to_end() on cache hits, popitem(last=False) for
# eviction). Hard cap via _AGENT_CACHE_MAX_SIZE, idle TTL enforced
# from _session_expiry_watcher().
import threading as _threading
self._agent_cache: Dict[str, tuple] = {}
self._agent_cache: "OrderedDict[str, tuple]" = OrderedDict()
self._agent_cache_lock = _threading.Lock()
# Per-session model overrides from /model command.
@ -2102,6 +2116,11 @@ class GatewayRunner:
_cached_agent = self._running_agents.get(key)
if _cached_agent and _cached_agent is not _AGENT_PENDING_SENTINEL:
self._cleanup_agent_resources(_cached_agent)
# Drop the cache entry so the AIAgent (and its LLM
# clients, tool schemas, memory provider refs) can
# be garbage-collected. Otherwise the cache grows
# unbounded across the gateway's lifetime.
self._evict_cached_agent(key)
# Mark as flushed and persist to disk so the flag
# survives gateway restarts.
with self.session_store._lock:
@ -2145,6 +2164,20 @@ class GatewayRunner:
logger.info(
"Session expiry done: %d flushed", _flushed,
)
# Sweep agents that have been idle beyond the TTL regardless
# of session reset policy. This catches sessions with very
# long / "never" reset windows, whose cached AIAgents would
# otherwise pin memory for the gateway's entire lifetime.
try:
_idle_evicted = self._sweep_idle_cached_agents()
if _idle_evicted:
logger.info(
"Agent cache idle sweep: evicted %d agent(s)",
_idle_evicted,
)
except Exception as _e:
logger.debug("Idle agent sweep failed: %s", _e)
except Exception as e:
logger.debug("Session expiry watcher error: %s", e)
# Sleep in small increments so we can stop quickly
@ -7887,6 +7920,153 @@ class GatewayRunner:
with _lock:
self._agent_cache.pop(session_key, None)
def _release_evicted_agent_soft(self, agent: Any) -> None:
"""Soft cleanup for cache-evicted agents — preserves session tool state.
Called from _enforce_agent_cache_cap and _sweep_idle_cached_agents.
Distinct from _cleanup_agent_resources (full teardown) because a
cache-evicted session may resume at any time its terminal
sandbox, browser daemon, and tracked bg processes must outlive
the Python AIAgent instance so the next agent built for the
same task_id inherits them.
"""
if agent is None:
return
try:
if hasattr(agent, "release_clients"):
agent.release_clients()
else:
# Older agent instance (shouldn't happen in practice) —
# fall back to the legacy full-close path.
self._cleanup_agent_resources(agent)
except Exception:
pass
def _enforce_agent_cache_cap(self) -> None:
"""Evict oldest cached agents when cache exceeds _AGENT_CACHE_MAX_SIZE.
Must be called with _agent_cache_lock held. Resource cleanup
(memory provider shutdown, tool resource close) is scheduled
on a daemon thread so the caller doesn't block on slow teardown
while holding the cache lock.
Agents currently in _running_agents are SKIPPED their clients,
terminal sandboxes, background processes, and child subagents
are all in active use by the running turn. Evicting them would
tear down those resources mid-turn and crash the request. If
every candidate in the LRU order is active, we simply leave the
cache over the cap; it will be re-checked on the next insert.
"""
_cache = getattr(self, "_agent_cache", None)
if _cache is None:
return
# OrderedDict.popitem(last=False) pops oldest; plain dict lacks the
# arg so skip enforcement if a test fixture swapped the cache type.
if not hasattr(_cache, "move_to_end"):
return
# Snapshot of agent instances that are actively mid-turn. Use id()
# so the lookup is O(1) and doesn't depend on AIAgent.__eq__ (which
# MagicMock overrides in tests).
running_ids = {
id(a)
for a in getattr(self, "_running_agents", {}).values()
if a is not None and a is not _AGENT_PENDING_SENTINEL
}
# Walk LRU → MRU and evict excess-LRU entries that aren't mid-turn.
# We only consider entries in the first (size - cap) LRU positions
# as eviction candidates. If one of those slots is held by an
# active agent, we SKIP it without compensating by evicting a
# newer entry — that would penalise a freshly-inserted session
# (which has no cache history to retain) while protecting an
# already-cached long-running one. The cache may therefore stay
# temporarily over cap; it will re-check on the next insert,
# after active turns have finished.
excess = max(0, len(_cache) - _AGENT_CACHE_MAX_SIZE)
evict_plan: List[tuple] = [] # [(key, agent), ...]
if excess > 0:
ordered_keys = list(_cache.keys())
for key in ordered_keys[:excess]:
entry = _cache.get(key)
agent = entry[0] if isinstance(entry, tuple) and entry else None
if agent is not None and id(agent) in running_ids:
continue # active mid-turn; don't evict, don't substitute
evict_plan.append((key, agent))
for key, _ in evict_plan:
_cache.pop(key, None)
remaining_over_cap = len(_cache) - _AGENT_CACHE_MAX_SIZE
if remaining_over_cap > 0:
logger.warning(
"Agent cache over cap (%d > %d); %d excess slot(s) held by "
"mid-turn agents — will re-check on next insert.",
len(_cache), _AGENT_CACHE_MAX_SIZE, remaining_over_cap,
)
for key, agent in evict_plan:
logger.info(
"Agent cache at cap; evicting LRU session=%s (cache_size=%d)",
key, len(_cache),
)
if agent is not None:
threading.Thread(
target=self._release_evicted_agent_soft,
args=(agent,),
daemon=True,
name=f"agent-cache-evict-{key[:24]}",
).start()
def _sweep_idle_cached_agents(self) -> int:
"""Evict cached agents whose AIAgent has been idle > _AGENT_CACHE_IDLE_TTL_SECS.
Safe to call from the session expiry watcher without holding the
cache lock acquires it internally. Returns the number of entries
evicted. Resource cleanup is scheduled on daemon threads.
Agents currently in _running_agents are SKIPPED for the same reason
as _enforce_agent_cache_cap: tearing down an active turn's clients
mid-flight would crash the request.
"""
_cache = getattr(self, "_agent_cache", None)
_lock = getattr(self, "_agent_cache_lock", None)
if _cache is None or _lock is None:
return 0
now = time.time()
to_evict: List[tuple] = []
running_ids = {
id(a)
for a in getattr(self, "_running_agents", {}).values()
if a is not None and a is not _AGENT_PENDING_SENTINEL
}
with _lock:
for key, entry in list(_cache.items()):
agent = entry[0] if isinstance(entry, tuple) and entry else None
if agent is None:
continue
if id(agent) in running_ids:
continue # mid-turn — don't tear it down
last_activity = getattr(agent, "_last_activity_ts", None)
if last_activity is None:
continue
if (now - last_activity) > _AGENT_CACHE_IDLE_TTL_SECS:
to_evict.append((key, agent))
for key, _ in to_evict:
_cache.pop(key, None)
for key, agent in to_evict:
logger.info(
"Agent cache idle-TTL evict: session=%s (idle=%.0fs)",
key, now - getattr(agent, "_last_activity_ts", now),
)
threading.Thread(
target=self._release_evicted_agent_soft,
args=(agent,),
daemon=True,
name=f"agent-cache-idle-{key[:24]}",
).start()
return len(to_evict)
# ------------------------------------------------------------------
# Proxy mode: forward messages to a remote Hermes API server
# ------------------------------------------------------------------
@ -8654,6 +8834,13 @@ class GatewayRunner:
cached = _cache.get(session_key)
if cached and cached[1] == _sig:
agent = cached[0]
# Refresh LRU order so the cap enforcement evicts
# truly-oldest entries, not the one we just used.
if hasattr(_cache, "move_to_end"):
try:
_cache.move_to_end(session_key)
except KeyError:
pass
# Reset activity timestamp so the inactivity timeout
# handler doesn't see stale idle time from the previous
# turn and immediately kill this agent. (#9051)
@ -8692,6 +8879,7 @@ class GatewayRunner:
if _cache_lock and _cache is not None:
with _cache_lock:
_cache[session_key] = (agent, _sig)
self._enforce_agent_cache_cap()
logger.debug("Created new agent for session %s (sig=%s)", session_key, _sig)
# Per-message state — callbacks and reasoning config change every

View file

@ -3242,6 +3242,53 @@ class AIAgent:
except Exception:
pass
def release_clients(self) -> None:
"""Release LLM client resources WITHOUT tearing down session tool state.
Used by the gateway when evicting this agent from _agent_cache for
memory-management reasons (LRU cap or idle TTL) the session may
resume at any time with a freshly-built AIAgent that reuses the
same task_id / session_id, so we must NOT kill:
- process_registry entries for task_id (user's bg shells)
- terminal sandbox for task_id (cwd, env, shell state)
- browser daemon for task_id (open tabs, cookies)
- memory provider (has its own lifecycle; keeps running)
We DO close:
- OpenAI/httpx client pool (big chunk of held memory + sockets;
the rebuilt agent gets a fresh client anyway)
- Active child subagents (per-turn artefacts; safe to drop)
Safe to call multiple times. Distinct from close() which is the
hard teardown for actual session boundaries (/new, /reset, session
expiry).
"""
# Close active child agents (per-turn; no cross-turn persistence).
try:
with self._active_children_lock:
children = list(self._active_children)
self._active_children.clear()
for child in children:
try:
child.release_clients()
except Exception:
# Fall back to full close on children; they're per-turn.
try:
child.close()
except Exception:
pass
except Exception:
pass
# Close the OpenAI/httpx client to release sockets immediately.
try:
client = getattr(self, "client", None)
if client is not None:
self._close_openai_client(client, reason="cache_evict", shared=True)
self.client = None
except Exception:
pass
def close(self) -> None:
"""Release all resources held by this agent instance.

View file

@ -258,3 +258,785 @@ class TestAgentCacheLifecycle:
cb3 = lambda *a: None
agent.tool_progress_callback = cb3
assert agent.tool_progress_callback is cb3
class TestAgentCacheBoundedGrowth:
"""LRU cap and idle-TTL eviction prevent unbounded cache growth."""
def _bounded_runner(self):
"""Runner with an OrderedDict cache (matches real gateway init)."""
from collections import OrderedDict
from gateway.run import GatewayRunner
runner = GatewayRunner.__new__(GatewayRunner)
runner._agent_cache = OrderedDict()
runner._agent_cache_lock = threading.Lock()
return runner
def _fake_agent(self, last_activity: float | None = None):
"""Lightweight stand-in; real AIAgent is heavy to construct."""
m = MagicMock()
if last_activity is not None:
m._last_activity_ts = last_activity
else:
import time as _t
m._last_activity_ts = _t.time()
return m
def test_cap_evicts_lru_when_exceeded(self, monkeypatch):
"""Inserting past _AGENT_CACHE_MAX_SIZE pops the oldest entry."""
from gateway import run as gw_run
monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 3)
runner = self._bounded_runner()
runner._cleanup_agent_resources = MagicMock()
for i in range(3):
runner._agent_cache[f"s{i}"] = (self._fake_agent(), f"sig{i}")
# Insert a 4th — oldest (s0) must be evicted.
with runner._agent_cache_lock:
runner._agent_cache["s3"] = (self._fake_agent(), "sig3")
runner._enforce_agent_cache_cap()
assert "s0" not in runner._agent_cache
assert "s3" in runner._agent_cache
assert len(runner._agent_cache) == 3
def test_cap_respects_move_to_end(self, monkeypatch):
"""Entries refreshed via move_to_end are NOT evicted as 'oldest'."""
from gateway import run as gw_run
monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 3)
runner = self._bounded_runner()
runner._cleanup_agent_resources = MagicMock()
for i in range(3):
runner._agent_cache[f"s{i}"] = (self._fake_agent(), f"sig{i}")
# Touch s0 — it is now MRU, so s1 becomes LRU.
runner._agent_cache.move_to_end("s0")
with runner._agent_cache_lock:
runner._agent_cache["s3"] = (self._fake_agent(), "sig3")
runner._enforce_agent_cache_cap()
assert "s0" in runner._agent_cache # rescued by move_to_end
assert "s1" not in runner._agent_cache # now oldest → evicted
assert "s3" in runner._agent_cache
def test_cap_triggers_cleanup_thread(self, monkeypatch):
"""Evicted agent has release_clients() called for it (soft cleanup).
Uses the soft path (_release_evicted_agent_soft), NOT the hard
_cleanup_agent_resources cache eviction must not tear down
per-task state (terminal/browser/bg procs).
"""
from gateway import run as gw_run
monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 1)
runner = self._bounded_runner()
release_calls: list = []
cleanup_calls: list = []
# Intercept both paths; only release_clients path should fire.
def _soft(agent):
release_calls.append(agent)
runner._release_evicted_agent_soft = _soft
runner._cleanup_agent_resources = lambda a: cleanup_calls.append(a)
old_agent = self._fake_agent()
new_agent = self._fake_agent()
with runner._agent_cache_lock:
runner._agent_cache["old"] = (old_agent, "sig_old")
runner._agent_cache["new"] = (new_agent, "sig_new")
runner._enforce_agent_cache_cap()
# Cleanup is dispatched to a daemon thread; join briefly to observe.
import time as _t
deadline = _t.time() + 2.0
while _t.time() < deadline and not release_calls:
_t.sleep(0.02)
assert old_agent in release_calls
assert new_agent not in release_calls
# Hard-cleanup path must NOT have fired — that's for session expiry only.
assert cleanup_calls == []
def test_idle_ttl_sweep_evicts_stale_agents(self, monkeypatch):
"""_sweep_idle_cached_agents removes agents idle past the TTL."""
from gateway import run as gw_run
monkeypatch.setattr(gw_run, "_AGENT_CACHE_IDLE_TTL_SECS", 0.05)
runner = self._bounded_runner()
runner._cleanup_agent_resources = MagicMock()
import time as _t
fresh = self._fake_agent(last_activity=_t.time())
stale = self._fake_agent(last_activity=_t.time() - 10.0)
runner._agent_cache["fresh"] = (fresh, "s1")
runner._agent_cache["stale"] = (stale, "s2")
evicted = runner._sweep_idle_cached_agents()
assert evicted == 1
assert "stale" not in runner._agent_cache
assert "fresh" in runner._agent_cache
def test_idle_sweep_skips_agents_without_activity_ts(self, monkeypatch):
"""Agents missing _last_activity_ts are left alone (defensive)."""
from gateway import run as gw_run
monkeypatch.setattr(gw_run, "_AGENT_CACHE_IDLE_TTL_SECS", 0.01)
runner = self._bounded_runner()
runner._cleanup_agent_resources = MagicMock()
no_ts = MagicMock(spec=[]) # no _last_activity_ts attribute
runner._agent_cache["s"] = (no_ts, "sig")
assert runner._sweep_idle_cached_agents() == 0
assert "s" in runner._agent_cache
def test_plain_dict_cache_is_tolerated(self):
"""Test fixtures using plain {} don't crash _enforce_agent_cache_cap."""
from gateway.run import GatewayRunner
runner = GatewayRunner.__new__(GatewayRunner)
runner._agent_cache = {} # plain dict, not OrderedDict
runner._agent_cache_lock = threading.Lock()
runner._cleanup_agent_resources = MagicMock()
# Should be a no-op rather than raising.
with runner._agent_cache_lock:
for i in range(200):
runner._agent_cache[f"s{i}"] = (MagicMock(), f"sig{i}")
runner._enforce_agent_cache_cap() # no crash, no eviction
assert len(runner._agent_cache) == 200
def test_main_lookup_updates_lru_order(self, monkeypatch):
"""Cache hit via the main-lookup path refreshes LRU position."""
runner = self._bounded_runner()
a0 = self._fake_agent()
a1 = self._fake_agent()
a2 = self._fake_agent()
runner._agent_cache["s0"] = (a0, "sig0")
runner._agent_cache["s1"] = (a1, "sig1")
runner._agent_cache["s2"] = (a2, "sig2")
# Simulate what _process_message_background does on a cache hit
# (minus the agent-state reset which isn't relevant here).
with runner._agent_cache_lock:
cached = runner._agent_cache.get("s0")
if cached and hasattr(runner._agent_cache, "move_to_end"):
runner._agent_cache.move_to_end("s0")
# After the hit, insertion order should be s1, s2, s0.
assert list(runner._agent_cache.keys()) == ["s1", "s2", "s0"]
class TestAgentCacheActiveSafety:
"""Safety: eviction must not tear down agents currently mid-turn.
AIAgent.close() kills process_registry entries for the task, cleans
the terminal sandbox, closes the OpenAI client, and cascades
.close() into active child subagents. Calling it while the agent
is still processing would crash the in-flight request. These tests
pin that eviction skips any agent present in _running_agents.
"""
def _runner(self):
from collections import OrderedDict
from gateway.run import GatewayRunner
runner = GatewayRunner.__new__(GatewayRunner)
runner._agent_cache = OrderedDict()
runner._agent_cache_lock = threading.Lock()
runner._running_agents = {}
return runner
def _fake_agent(self, idle_seconds: float = 0.0):
import time as _t
m = MagicMock()
m._last_activity_ts = _t.time() - idle_seconds
return m
def test_cap_skips_active_lru_entry(self, monkeypatch):
"""Active LRU entry is skipped; cache stays over cap rather than
compensating by evicting a newer entry.
Rationale: evicting a more-recent entry just because the oldest
slot is temporarily locked would punish the most recently-
inserted session (which has no cache to preserve) to protect
one that happens to be mid-turn. Better to let the cache stay
transiently over cap and re-check on the next insert.
"""
from gateway import run as gw_run
monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 2)
runner = self._runner()
runner._cleanup_agent_resources = MagicMock()
active = self._fake_agent()
idle_a = self._fake_agent()
idle_b = self._fake_agent()
# Insertion order: active (oldest), idle_a, idle_b.
runner._agent_cache["session-active"] = (active, "sig")
runner._agent_cache["session-idle-a"] = (idle_a, "sig")
runner._agent_cache["session-idle-b"] = (idle_b, "sig")
# Mark `active` as mid-turn — it's LRU, but protected.
runner._running_agents["session-active"] = active
with runner._agent_cache_lock:
runner._enforce_agent_cache_cap()
# All three remain; no eviction ran, no cleanup dispatched.
assert "session-active" in runner._agent_cache
assert "session-idle-a" in runner._agent_cache
assert "session-idle-b" in runner._agent_cache
assert runner._cleanup_agent_resources.call_count == 0
def test_cap_evicts_when_multiple_excess_and_some_inactive(self, monkeypatch):
"""Mixed active/idle in the LRU excess window: only the idle ones go.
With CAP=2 and 4 entries, excess=2 (the two oldest). If the
oldest is active and the next is idle, we evict exactly one.
Cache ends at CAP+1, which is still better than unbounded.
"""
from gateway import run as gw_run
monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 2)
runner = self._runner()
runner._cleanup_agent_resources = MagicMock()
oldest_active = self._fake_agent()
idle_second = self._fake_agent()
idle_third = self._fake_agent()
idle_fourth = self._fake_agent()
runner._agent_cache["s1"] = (oldest_active, "sig")
runner._agent_cache["s2"] = (idle_second, "sig") # in excess window, idle
runner._agent_cache["s3"] = (idle_third, "sig")
runner._agent_cache["s4"] = (idle_fourth, "sig")
runner._running_agents["s1"] = oldest_active # oldest is mid-turn
with runner._agent_cache_lock:
runner._enforce_agent_cache_cap()
# s1 protected (active), s2 evicted (idle + in excess window),
# s3 and s4 untouched (outside excess window).
assert "s1" in runner._agent_cache
assert "s2" not in runner._agent_cache
assert "s3" in runner._agent_cache
assert "s4" in runner._agent_cache
def test_cap_leaves_cache_over_limit_if_all_active(self, monkeypatch, caplog):
"""If every over-cap entry is mid-turn, the cache stays over cap.
Better to temporarily exceed the cap than to crash an in-flight
turn by tearing down its clients.
"""
from gateway import run as gw_run
import logging as _logging
monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 1)
runner = self._runner()
runner._cleanup_agent_resources = MagicMock()
a1 = self._fake_agent()
a2 = self._fake_agent()
a3 = self._fake_agent()
runner._agent_cache["s1"] = (a1, "sig")
runner._agent_cache["s2"] = (a2, "sig")
runner._agent_cache["s3"] = (a3, "sig")
# All three are mid-turn.
runner._running_agents["s1"] = a1
runner._running_agents["s2"] = a2
runner._running_agents["s3"] = a3
with caplog.at_level(_logging.WARNING, logger="gateway.run"):
with runner._agent_cache_lock:
runner._enforce_agent_cache_cap()
# Cache unchanged because eviction had to skip every candidate.
assert len(runner._agent_cache) == 3
# _cleanup_agent_resources must NOT have been scheduled.
assert runner._cleanup_agent_resources.call_count == 0
# And we logged a warning so operators can see the condition.
assert any("mid-turn" in r.message for r in caplog.records)
def test_cap_pending_sentinel_does_not_block_eviction(self, monkeypatch):
"""_AGENT_PENDING_SENTINEL in _running_agents is treated as 'not active'.
The sentinel is set while an agent is being CONSTRUCTED, before the
real AIAgent instance exists. Cached agents from other sessions
can still be evicted safely.
"""
from gateway import run as gw_run
from gateway.run import _AGENT_PENDING_SENTINEL
monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 1)
runner = self._runner()
runner._cleanup_agent_resources = MagicMock()
a1 = self._fake_agent()
a2 = self._fake_agent()
runner._agent_cache["s1"] = (a1, "sig")
runner._agent_cache["s2"] = (a2, "sig")
# Another session is mid-creation — sentinel, no real agent yet.
runner._running_agents["s3-being-created"] = _AGENT_PENDING_SENTINEL
with runner._agent_cache_lock:
runner._enforce_agent_cache_cap()
assert "s1" not in runner._agent_cache # evicted normally
assert "s2" in runner._agent_cache
def test_idle_sweep_skips_active_agent(self, monkeypatch):
"""Idle-TTL sweep must not tear down an active agent even if 'stale'."""
from gateway import run as gw_run
monkeypatch.setattr(gw_run, "_AGENT_CACHE_IDLE_TTL_SECS", 0.01)
runner = self._runner()
runner._cleanup_agent_resources = MagicMock()
old_but_active = self._fake_agent(idle_seconds=10.0)
runner._agent_cache["s1"] = (old_but_active, "sig")
runner._running_agents["s1"] = old_but_active
evicted = runner._sweep_idle_cached_agents()
assert evicted == 0
assert "s1" in runner._agent_cache
assert runner._cleanup_agent_resources.call_count == 0
def test_eviction_does_not_close_active_agent_client(self, monkeypatch):
"""Live test: evicting an active agent does NOT null its .client.
This reproduces the original concern if eviction fired while an
agent was mid-turn, `agent.close()` would set `self.client = None`
and the next API call inside the loop would crash. With the
active-agent skip, the client stays intact.
"""
from gateway import run as gw_run
monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 1)
runner = self._runner()
# Build a proper fake agent whose close() matches AIAgent's contract.
active = MagicMock()
active._last_activity_ts = __import__("time").time()
active.client = MagicMock() # simulate an OpenAI client
def _real_close():
active.client = None # mirrors run_agent.py:3299
active.close = _real_close
active.shutdown_memory_provider = MagicMock()
idle = self._fake_agent()
runner._agent_cache["active-session"] = (active, "sig")
runner._agent_cache["idle-session"] = (idle, "sig")
runner._running_agents["active-session"] = active
# Real cleanup function, not mocked — we want to see whether close()
# runs on the active agent. (It shouldn't.)
with runner._agent_cache_lock:
runner._enforce_agent_cache_cap()
# Let any eviction cleanup threads drain.
import time as _t
_t.sleep(0.2)
# The ACTIVE agent's client must still be usable.
assert active.client is not None, (
"Active agent's client was closed by eviction — "
"running turn would crash on its next API call."
)
class TestAgentCacheSpilloverLive:
"""Live E2E: fill cache with real AIAgent instances and stress it."""
def _runner(self):
from collections import OrderedDict
from gateway.run import GatewayRunner
runner = GatewayRunner.__new__(GatewayRunner)
runner._agent_cache = OrderedDict()
runner._agent_cache_lock = threading.Lock()
runner._running_agents = {}
return runner
def _real_agent(self):
"""A genuine AIAgent; no API calls are made during these tests."""
from run_agent import AIAgent
return AIAgent(
model="anthropic/claude-sonnet-4", api_key="test",
base_url="https://openrouter.ai/api/v1", provider="openrouter",
max_iterations=5, quiet_mode=True,
skip_context_files=True, skip_memory=True,
platform="telegram",
)
def test_fill_to_cap_then_spillover(self, monkeypatch):
"""Fill to cap with real agents, insert one more, oldest evicted."""
from gateway import run as gw_run
CAP = 8
monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", CAP)
runner = self._runner()
agents = [self._real_agent() for _ in range(CAP)]
for i, a in enumerate(agents):
with runner._agent_cache_lock:
runner._agent_cache[f"s{i}"] = (a, "sig")
runner._enforce_agent_cache_cap()
assert len(runner._agent_cache) == CAP
# Spillover insertion.
newcomer = self._real_agent()
with runner._agent_cache_lock:
runner._agent_cache["new"] = (newcomer, "sig")
runner._enforce_agent_cache_cap()
# Oldest (s0) evicted, cap still CAP.
assert "s0" not in runner._agent_cache
assert "new" in runner._agent_cache
assert len(runner._agent_cache) == CAP
# Clean up so pytest doesn't leak resources.
for a in agents + [newcomer]:
try:
a.close()
except Exception:
pass
def test_spillover_all_active_keeps_cache_over_cap(self, monkeypatch, caplog):
"""Every slot active: cache goes over cap, no one gets torn down."""
from gateway import run as gw_run
import logging as _logging
CAP = 4
monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", CAP)
runner = self._runner()
agents = [self._real_agent() for _ in range(CAP)]
for i, a in enumerate(agents):
runner._agent_cache[f"s{i}"] = (a, "sig")
runner._running_agents[f"s{i}"] = a # every session mid-turn
newcomer = self._real_agent()
with caplog.at_level(_logging.WARNING, logger="gateway.run"):
with runner._agent_cache_lock:
runner._agent_cache["new"] = (newcomer, "sig")
runner._enforce_agent_cache_cap()
assert len(runner._agent_cache) == CAP + 1 # temporarily over cap
# All existing agents still usable.
for i, a in enumerate(agents):
assert a.client is not None, f"s{i} got closed while active!"
# And we warned operators.
assert any("mid-turn" in r.message for r in caplog.records)
for a in agents + [newcomer]:
try:
a.close()
except Exception:
pass
def test_concurrent_inserts_settle_at_cap(self, monkeypatch):
"""Many threads inserting in parallel end with len(cache) == CAP."""
from gateway import run as gw_run
CAP = 16
monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", CAP)
runner = self._runner()
N_THREADS = 8
PER_THREAD = 20 # 8 * 20 = 160 inserts into a 16-slot cache
def worker(tid: int):
for j in range(PER_THREAD):
a = self._real_agent()
key = f"t{tid}-s{j}"
with runner._agent_cache_lock:
runner._agent_cache[key] = (a, "sig")
runner._enforce_agent_cache_cap()
threads = [
threading.Thread(target=worker, args=(t,), daemon=True)
for t in range(N_THREADS)
]
for t in threads:
t.start()
for t in threads:
t.join(timeout=30)
assert not t.is_alive(), "Worker thread hung — possible deadlock?"
# Let daemon cleanup threads settle.
import time as _t
_t.sleep(0.5)
assert len(runner._agent_cache) == CAP, (
f"Expected exactly {CAP} entries after concurrent inserts, "
f"got {len(runner._agent_cache)}."
)
def test_evicted_session_next_turn_gets_fresh_agent(self, monkeypatch):
"""After eviction, the same session_key can insert a fresh agent.
Simulates the real spillover flow: evicted session sends another
message, which builds a new AIAgent and re-enters the cache.
"""
from gateway import run as gw_run
CAP = 2
monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", CAP)
runner = self._runner()
a0 = self._real_agent()
a1 = self._real_agent()
runner._agent_cache["sA"] = (a0, "sig")
runner._agent_cache["sB"] = (a1, "sig")
# 3rd session forces sA (oldest) out.
a2 = self._real_agent()
with runner._agent_cache_lock:
runner._agent_cache["sC"] = (a2, "sig")
runner._enforce_agent_cache_cap()
assert "sA" not in runner._agent_cache
# Let the eviction cleanup thread run.
import time as _t
_t.sleep(0.3)
# Now sA's user sends another message → a fresh agent goes in.
a0_new = self._real_agent()
with runner._agent_cache_lock:
runner._agent_cache["sA"] = (a0_new, "sig")
runner._enforce_agent_cache_cap()
assert "sA" in runner._agent_cache
assert runner._agent_cache["sA"][0] is a0_new # the new one, not stale
# Fresh agent is usable.
assert a0_new.client is not None
for a in (a0, a1, a2, a0_new):
try:
a.close()
except Exception:
pass
class TestAgentCacheIdleResume:
"""End-to-end: idle-TTL-evicted session resumes cleanly with task state.
Real-world scenario: user leaves a Telegram session open for 2+ hours.
Idle-TTL evicts their cached agent. They come back and send a message.
The new agent built for the same session_id must inherit:
- Conversation history (from SessionStore outside cache concern)
- Terminal sandbox (same task_id same _active_environments entry)
- Browser daemon (same task_id same browser session)
- Background processes (same task_id same process_registry entries)
The ONLY thing that should reset is the LLM client pool (rebuilt fresh).
"""
def _runner(self):
from collections import OrderedDict
from gateway.run import GatewayRunner
runner = GatewayRunner.__new__(GatewayRunner)
runner._agent_cache = OrderedDict()
runner._agent_cache_lock = threading.Lock()
runner._running_agents = {}
return runner
def test_release_clients_does_not_touch_process_registry(self, monkeypatch):
"""release_clients must not call process_registry.kill_all for task_id."""
from run_agent import AIAgent
agent = AIAgent(
model="anthropic/claude-sonnet-4", api_key="test",
base_url="https://openrouter.ai/api/v1", provider="openrouter",
max_iterations=5, quiet_mode=True,
skip_context_files=True, skip_memory=True,
session_id="idle-resume-test-session",
)
# Spy on process_registry.kill_all — it MUST NOT be called.
from tools import process_registry as _pr
kill_all_calls: list = []
original_kill_all = _pr.process_registry.kill_all
_pr.process_registry.kill_all = lambda **kw: kill_all_calls.append(kw)
try:
agent.release_clients()
finally:
_pr.process_registry.kill_all = original_kill_all
try:
agent.close()
except Exception:
pass
assert kill_all_calls == [], (
f"release_clients() called process_registry.kill_all — would "
f"kill user's bg processes on cache eviction. Calls: {kill_all_calls}"
)
def test_release_clients_does_not_touch_terminal_or_browser(self, monkeypatch):
"""release_clients must not call cleanup_vm or cleanup_browser."""
from run_agent import AIAgent
from tools import terminal_tool as _tt
from tools import browser_tool as _bt
agent = AIAgent(
model="anthropic/claude-sonnet-4", api_key="test",
base_url="https://openrouter.ai/api/v1", provider="openrouter",
max_iterations=5, quiet_mode=True,
skip_context_files=True, skip_memory=True,
session_id="idle-resume-test-2",
)
vm_calls: list = []
browser_calls: list = []
original_vm = _tt.cleanup_vm
original_browser = _bt.cleanup_browser
_tt.cleanup_vm = lambda tid: vm_calls.append(tid)
_bt.cleanup_browser = lambda tid: browser_calls.append(tid)
try:
agent.release_clients()
finally:
_tt.cleanup_vm = original_vm
_bt.cleanup_browser = original_browser
try:
agent.close()
except Exception:
pass
assert vm_calls == [], (
f"release_clients() tore down terminal sandbox — user's cwd, "
f"env, and bg shells would be gone on resume. Calls: {vm_calls}"
)
assert browser_calls == [], (
f"release_clients() tore down browser session — user's open "
f"tabs and cookies gone on resume. Calls: {browser_calls}"
)
def test_release_clients_closes_llm_client(self):
"""release_clients IS expected to close the OpenAI/httpx client."""
from run_agent import AIAgent
agent = AIAgent(
model="anthropic/claude-sonnet-4", api_key="test",
base_url="https://openrouter.ai/api/v1", provider="openrouter",
max_iterations=5, quiet_mode=True,
skip_context_files=True, skip_memory=True,
)
# Clients are lazy-built; force one to exist so we can verify close.
assert agent.client is not None # __init__ builds it
agent.release_clients()
# Post-release: client reference is dropped (memory freed).
assert agent.client is None
def test_close_vs_release_full_teardown_difference(self, monkeypatch):
"""close() tears down task state; release_clients() does not.
This pins the semantic contract: session-expiry path uses close()
(full teardown session is done), cache-eviction path uses
release_clients() (soft session may resume).
"""
from run_agent import AIAgent
from tools import terminal_tool as _tt
# Agent A: evicted from cache (soft) — terminal survives.
# Agent B: session expired (hard) — terminal torn down.
agent_a = AIAgent(
model="anthropic/claude-sonnet-4", api_key="test",
base_url="https://openrouter.ai/api/v1", provider="openrouter",
max_iterations=5, quiet_mode=True,
skip_context_files=True, skip_memory=True,
session_id="soft-session",
)
agent_b = AIAgent(
model="anthropic/claude-sonnet-4", api_key="test",
base_url="https://openrouter.ai/api/v1", provider="openrouter",
max_iterations=5, quiet_mode=True,
skip_context_files=True, skip_memory=True,
session_id="hard-session",
)
vm_calls: list = []
original_vm = _tt.cleanup_vm
_tt.cleanup_vm = lambda tid: vm_calls.append(tid)
try:
agent_a.release_clients() # cache eviction
agent_b.close() # session expiry
finally:
_tt.cleanup_vm = original_vm
try:
agent_a.close()
except Exception:
pass
# Only agent_b's task_id should appear in cleanup calls.
assert "hard-session" in vm_calls
assert "soft-session" not in vm_calls
def test_idle_evicted_session_rebuild_inherits_task_id(self, monkeypatch):
"""After idle-TTL eviction, a fresh agent with the same session_id
gets the same task_id so tool state (terminal/browser/bg procs)
that persisted across eviction is reachable via the new agent.
"""
from gateway import run as gw_run
from run_agent import AIAgent
monkeypatch.setattr(gw_run, "_AGENT_CACHE_IDLE_TTL_SECS", 0.01)
runner = self._runner()
# Build an agent representing a stale (idle) session.
SESSION_ID = "long-lived-user-session"
old = AIAgent(
model="anthropic/claude-sonnet-4", api_key="test",
base_url="https://openrouter.ai/api/v1", provider="openrouter",
max_iterations=5, quiet_mode=True,
skip_context_files=True, skip_memory=True,
session_id=SESSION_ID,
)
old._last_activity_ts = 0.0 # force idle
runner._agent_cache["sKey"] = (old, "sig")
# Simulate the idle-TTL sweep firing.
runner._sweep_idle_cached_agents()
assert "sKey" not in runner._agent_cache
# Wait for the daemon thread doing release_clients() to finish.
import time as _t
_t.sleep(0.3)
# Old agent's client is gone (soft cleanup fired).
assert old.client is None
# User comes back — new agent built for the SAME session_id.
new_agent = AIAgent(
model="anthropic/claude-sonnet-4", api_key="test",
base_url="https://openrouter.ai/api/v1", provider="openrouter",
max_iterations=5, quiet_mode=True,
skip_context_files=True, skip_memory=True,
session_id=SESSION_ID,
)
# Same session_id means same task_id routed to tools. The new
# agent inherits any per-task state (terminal sandbox etc.) that
# was preserved across eviction.
assert new_agent.session_id == old.session_id == SESSION_ID
# And it has a fresh working client.
assert new_agent.client is not None
try:
new_agent.close()
except Exception:
pass