diff --git a/gateway/run.py b/gateway/run.py index 694bbfe628..5faf6dee06 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -3364,17 +3364,15 @@ class GatewayRunner: logger.debug("Gateway memory flush on reset failed: %s", e) # Close tool resources on the old agent (terminal sandboxes, browser # daemons, background processes) before evicting from cache. - _lock = getattr(self, "_agent_cache_lock", None) - if _lock: - with _lock: - _cached = self._agent_cache.get(session_key) - _old_agent = _cached[0] if isinstance(_cached, tuple) else _cached if _cached else None - if _old_agent is not None: - try: - if hasattr(_old_agent, "close"): - _old_agent.close() - except Exception: - pass + with self._agent_cache_lock: + _cached = self._agent_cache.get(session_key) + _old_agent = _cached[0] if isinstance(_cached, tuple) else _cached if _cached else None + if _old_agent is not None: + try: + if hasattr(_old_agent, "close"): + _old_agent.close() + except Exception: + pass self._evict_cached_agent(session_key) try: diff --git a/tests/tools/test_zombie_process_cleanup.py b/tests/tools/test_zombie_process_cleanup.py new file mode 100644 index 0000000000..9cbbbcd1fd --- /dev/null +++ b/tests/tools/test_zombie_process_cleanup.py @@ -0,0 +1,274 @@ +"""Tests for zombie process cleanup — verifies processes spawned by tools +are properly reaped when agent sessions end. + +Reproduction for issue #7131: zombie process accumulation on long-running +gateway deployments. +""" + +import os +import signal +import subprocess +import sys +import time +import threading + +import pytest + + +def _spawn_sleep(seconds: float = 60) -> subprocess.Popen: + """Spawn a portable long-lived Python sleep process (no shell wrapper).""" + return subprocess.Popen( + [sys.executable, "-c", f"import time; time.sleep({seconds})"], + ) + + +def _pid_alive(pid: int) -> bool: + """Return True if a process with the given PID is still running.""" + try: + os.kill(pid, 0) + return True + except (ProcessLookupError, PermissionError): + return False + + +class TestZombieReproduction: + """Demonstrate that subprocesses survive when cleanup is not called.""" + + def test_orphaned_processes_survive_without_cleanup(self): + """REPRODUCTION: processes spawned directly survive if no one kills + them — this models the gap that causes zombie accumulation when + the gateway drops agent references without calling close().""" + pids = [] + + try: + for _ in range(3): + proc = _spawn_sleep(60) + pids.append(proc.pid) + + for pid in pids: + assert _pid_alive(pid), f"PID {pid} should be alive after spawn" + + # Simulate "session end" by just dropping the reference + del proc # noqa: F821 + + # BUG: processes are still alive after reference is dropped + for pid in pids: + assert _pid_alive(pid), ( + f"PID {pid} died after ref drop — " + f"expected it to survive (demonstrating the bug)" + ) + finally: + for pid in pids: + try: + os.kill(pid, signal.SIGKILL) + except (ProcessLookupError, PermissionError): + pass + + def test_explicit_terminate_reaps_processes(self): + """Explicitly terminating+waiting on Popen handles works. + This models what ProcessRegistry.kill_process does internally.""" + procs = [] + + try: + for _ in range(3): + proc = _spawn_sleep(60) + procs.append(proc) + + for proc in procs: + assert _pid_alive(proc.pid) + + for proc in procs: + proc.terminate() + proc.wait(timeout=5) + + for proc in procs: + assert proc.returncode is not None, ( + f"PID {proc.pid} should have exited after terminate+wait" + ) + finally: + for proc in procs: + try: + proc.kill() + proc.wait(timeout=1) + except Exception: + pass + + +class TestAgentCloseMethod: + """Verify AIAgent.close() exists, is idempotent, and calls cleanup.""" + + def test_close_calls_cleanup_functions(self): + """close() should call kill_all, cleanup_vm, cleanup_browser.""" + from unittest.mock import patch + + with patch("run_agent.AIAgent.__init__", return_value=None): + from run_agent import AIAgent + agent = AIAgent.__new__(AIAgent) + agent.session_id = "test-close-cleanup" + agent._active_children = [] + agent._active_children_lock = threading.Lock() + agent.client = None + + with patch("tools.process_registry.process_registry") as mock_registry, \ + patch("tools.terminal_tool.cleanup_vm") as mock_cleanup_vm, \ + patch("tools.browser_tool.cleanup_browser") as mock_cleanup_browser: + agent.close() + + mock_registry.kill_all.assert_called_once_with( + task_id="test-close-cleanup" + ) + mock_cleanup_vm.assert_called_once_with("test-close-cleanup") + mock_cleanup_browser.assert_called_once_with("test-close-cleanup") + + def test_close_is_idempotent(self): + """close() can be called multiple times without error.""" + from unittest.mock import patch + + with patch("run_agent.AIAgent.__init__", return_value=None): + from run_agent import AIAgent + agent = AIAgent.__new__(AIAgent) + agent.session_id = "test-close-idempotent" + agent._active_children = [] + agent._active_children_lock = threading.Lock() + agent.client = None + + agent.close() + agent.close() + agent.close() + + def test_close_propagates_to_children(self): + """close() should call close() on all active child agents.""" + from unittest.mock import MagicMock, patch + + with patch("run_agent.AIAgent.__init__", return_value=None): + from run_agent import AIAgent + agent = AIAgent.__new__(AIAgent) + agent.session_id = "test-close-children" + agent._active_children_lock = threading.Lock() + agent.client = None + + child_1 = MagicMock() + child_2 = MagicMock() + agent._active_children = [child_1, child_2] + + agent.close() + + child_1.close.assert_called_once() + child_2.close.assert_called_once() + assert agent._active_children == [] + + def test_close_survives_partial_failures(self): + """close() continues cleanup even if one step fails.""" + from unittest.mock import patch + + with patch("run_agent.AIAgent.__init__", return_value=None): + from run_agent import AIAgent + agent = AIAgent.__new__(AIAgent) + agent.session_id = "test-close-partial" + agent._active_children = [] + agent._active_children_lock = threading.Lock() + agent.client = None + + with patch( + "tools.process_registry.process_registry" + ) as mock_reg, patch( + "tools.terminal_tool.cleanup_vm" + ) as mock_vm, patch( + "tools.browser_tool.cleanup_browser" + ) as mock_browser: + mock_reg.kill_all.side_effect = RuntimeError("boom") + + agent.close() + + mock_vm.assert_called_once() + mock_browser.assert_called_once() + + +class TestGatewayCleanupWiring: + """Verify gateway lifecycle calls close() on agents.""" + + def test_gateway_stop_calls_close(self): + """gateway stop() should call close() on all running agents.""" + import asyncio + from unittest.mock import MagicMock, patch + + runner = MagicMock() + runner._running = True + runner._running_agents = {} + runner.adapters = {} + runner._background_tasks = set() + runner._pending_messages = {} + runner._pending_approvals = {} + runner._shutdown_event = asyncio.Event() + runner._exit_reason = None + + mock_agent_1 = MagicMock() + mock_agent_2 = MagicMock() + runner._running_agents = { + "session-1": mock_agent_1, + "session-2": mock_agent_2, + } + + from gateway.run import GatewayRunner + + loop = asyncio.new_event_loop() + try: + with patch("gateway.status.remove_pid_file"), \ + patch("gateway.status.write_runtime_status"), \ + patch("tools.terminal_tool.cleanup_all_environments"), \ + patch("tools.browser_tool.cleanup_all_browsers"): + loop.run_until_complete(GatewayRunner.stop(runner)) + finally: + loop.close() + + mock_agent_1.close.assert_called() + mock_agent_2.close.assert_called() + + def test_evict_does_not_call_close(self): + """_evict_cached_agent() should NOT call close() — it's also used + for non-destructive refreshes (model switch, branch, fallback).""" + import threading + from unittest.mock import MagicMock + + from gateway.run import GatewayRunner + + runner = object.__new__(GatewayRunner) + runner._agent_cache_lock = threading.Lock() + + mock_agent = MagicMock() + runner._agent_cache = {"session-key": (mock_agent, 12345)} + + GatewayRunner._evict_cached_agent(runner, "session-key") + + mock_agent.close.assert_not_called() + assert "session-key" not in runner._agent_cache + + +class TestDelegationCleanup: + """Verify subagent delegation cleans up child agents.""" + + def test_run_single_child_calls_close(self): + """_run_single_child finally block should call close() on child.""" + from unittest.mock import MagicMock + from tools.delegate_tool import _run_single_child + + parent = MagicMock() + parent._active_children = [] + parent._active_children_lock = threading.Lock() + + child = MagicMock() + child._delegate_saved_tool_names = ["tool1"] + child.run_conversation.side_effect = RuntimeError("test abort") + + parent._active_children.append(child) + + result = _run_single_child( + task_index=0, + goal="test goal", + child=child, + parent_agent=parent, + ) + + child.close.assert_called_once() + assert child not in parent._active_children + assert result["status"] == "error"