From e5dc569daac34ba0a5f82069ce78c6fb7a25917c Mon Sep 17 00:00:00 2001 From: Himess Date: Sat, 14 Mar 2026 11:03:20 -0700 Subject: [PATCH] fix: salvage gateway dedup and executor cleanup from PR #993 Salvages the two still-relevant fixes from PR #993 onto current main: - use a 3-tuple LOCAL delivery key so explicit/local-origin targets are not duplicated - shut down the previous agent-loop ThreadPoolExecutor when resizing the global pool Adds regression tests for both behaviors. --- environments/agent_loop.py | 2 ++ gateway/delivery.py | 2 +- tests/gateway/test_delivery.py | 11 ++++++++++- tests/test_agent_loop.py | 19 +++++++++++++++++++ 4 files changed, 32 insertions(+), 2 deletions(-) diff --git a/environments/agent_loop.py b/environments/agent_loop.py index ab8c0236e..dec3bc4ec 100644 --- a/environments/agent_loop.py +++ b/environments/agent_loop.py @@ -39,7 +39,9 @@ def resize_tool_pool(max_workers: int): Safe to call before any tasks are submitted. """ global _tool_executor + old_executor = _tool_executor _tool_executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) + old_executor.shutdown(wait=False) logger.info("Tool thread pool resized to %d workers", max_workers) logger = logging.getLogger(__name__) diff --git a/gateway/delivery.py b/gateway/delivery.py index 630ab638e..69ec6376c 100644 --- a/gateway/delivery.py +++ b/gateway/delivery.py @@ -161,7 +161,7 @@ class DeliveryRouter: # Always include local if configured if self.config.always_log_local: - local_key = (Platform.LOCAL, None) + local_key = (Platform.LOCAL, None, None) if local_key not in seen_platforms: targets.append(DeliveryTarget(platform=Platform.LOCAL)) diff --git a/tests/gateway/test_delivery.py b/tests/gateway/test_delivery.py index 42eba781e..3894897f4 100644 --- a/tests/gateway/test_delivery.py +++ b/tests/gateway/test_delivery.py @@ -1,7 +1,7 @@ """Tests for the delivery routing module.""" from gateway.config import Platform, GatewayConfig, PlatformConfig, HomeChannel -from gateway.delivery import DeliveryTarget, parse_deliver_spec +from gateway.delivery import DeliveryRouter, DeliveryTarget, parse_deliver_spec from gateway.session import SessionSource @@ -85,3 +85,12 @@ class TestTargetToStringRoundtrip: reparsed = DeliveryTarget.parse(s) assert reparsed.platform == Platform.TELEGRAM assert reparsed.chat_id == "999" + + +class TestDeliveryRouter: + def test_resolve_targets_does_not_duplicate_local_when_explicit(self): + router = DeliveryRouter(GatewayConfig(always_log_local=True)) + + targets = router.resolve_targets(["local"]) + + assert [target.platform for target in targets] == [Platform.LOCAL] diff --git a/tests/test_agent_loop.py b/tests/test_agent_loop.py index bb0ccd069..b95ff7808 100644 --- a/tests/test_agent_loop.py +++ b/tests/test_agent_loop.py @@ -484,3 +484,22 @@ class TestResizeToolPool: """resize_tool_pool should not raise.""" resize_tool_pool(16) # Small pool for testing resize_tool_pool(128) # Restore default + + def test_resize_shuts_down_previous_executor(self, monkeypatch): + """Replacing the global tool executor should shut down the old pool.""" + import environments.agent_loop as agent_loop_module + + old_executor = MagicMock() + new_executor = MagicMock() + + monkeypatch.setattr(agent_loop_module, "_tool_executor", old_executor) + monkeypatch.setattr( + agent_loop_module.concurrent.futures, + "ThreadPoolExecutor", + MagicMock(return_value=new_executor), + ) + + resize_tool_pool(16) + + old_executor.shutdown.assert_called_once_with(wait=False) + assert agent_loop_module._tool_executor is new_executor