diff --git a/tests/gateway/test_stream_consumer.py b/tests/gateway/test_stream_consumer.py index 38532e66b..cdba5f60e 100644 --- a/tests/gateway/test_stream_consumer.py +++ b/tests/gateway/test_stream_consumer.py @@ -606,6 +606,56 @@ class TestSegmentBreakOnToolBoundary: assert sent_texts[0].startswith(prefix) assert sum(len(t) for t in sent_texts[1:]) == len(tail) + @pytest.mark.asyncio + async def test_fallback_final_sends_full_text_at_tool_boundary(self): + """After a tool call, the streamed prefix is stale (from the pre-tool + segment). _send_fallback_final must still send the post-tool response + even when continuation_text calculates as empty (#10807).""" + adapter = MagicMock() + adapter.send = AsyncMock( + return_value=SimpleNamespace(success=True, message_id="msg_1"), + ) + adapter.edit_message = AsyncMock( + return_value=SimpleNamespace(success=True), + ) + adapter.MAX_MESSAGE_LENGTH = 4096 + + config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) + consumer = GatewayStreamConsumer(adapter, "chat_123", config) + + # Simulate a pre-tool streamed segment that becomes the visible prefix + pre_tool_text = "I'll run that code now." + consumer.on_delta(pre_tool_text) + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) + + # After the tool call, the model returns a SHORT final response that + # does NOT start with the pre-tool prefix. The continuation calculator + # would return empty (no prefix match → full text returned, but if the + # streaming edit already showed pre_tool_text, the prefix-based logic + # wrongly matches). Simulate this by setting _last_sent_text to the + # pre-tool content, then finishing with different post-tool content. + consumer._last_sent_text = pre_tool_text + post_tool_response = "⏰ Script timed out after 30s and was killed." + consumer.finish() + await task + + # The fallback should send the post-tool response via + # _send_fallback_final. + await consumer._send_fallback_final(post_tool_response) + + # Verify the final text was sent (not silently dropped) + sent = False + for call in adapter.send.call_args_list: + content = call[1].get("content", call[0][0] if call[0] else "") + if "timed out" in str(content): + sent = True + break + assert sent, ( + "Post-tool timeout response was silently dropped by " + "_send_fallback_final — the #10807 fix should prevent this" + ) + class TestInterimCommentaryMessages: @pytest.mark.asyncio diff --git a/tests/tools/test_code_execution.py b/tests/tools/test_code_execution.py index d2fbc7c10..15f8faa9b 100644 --- a/tests/tools/test_code_execution.py +++ b/tests/tools/test_code_execution.py @@ -279,6 +279,10 @@ raise RuntimeError("deliberate crash") )) self.assertEqual(result["status"], "timeout") self.assertIn("timed out", result.get("error", "")) + # The timeout message must also appear in output so the LLM always + # surfaces it to the user (#10807). + self.assertIn("timed out", result.get("output", "")) + self.assertIn("\u23f0", result.get("output", "")) def test_web_search_tool(self): """Script calls web_search and processes results.""" diff --git a/tools/code_execution_tool.py b/tools/code_execution_tool.py index d61164bca..3e7e3f925 100644 --- a/tools/code_execution_tool.py +++ b/tools/code_execution_tool.py @@ -1128,8 +1128,10 @@ def execute_code( stderr_reader.start() status = "success" - _last_activity_touch = time.monotonic() - _ACTIVITY_INTERVAL = 10.0 + _activity_state = { + "last_touch": time.monotonic(), + "start": exec_start, + } while proc.poll() is None: if _is_interrupted(): _kill_process_group(proc) @@ -1141,17 +1143,11 @@ def execute_code( break # Periodic activity touch so the gateway's inactivity timeout # doesn't kill the agent during long code execution (#10807). - _now = time.monotonic() - if _now - _last_activity_touch >= _ACTIVITY_INTERVAL: - _last_activity_touch = _now - try: - from tools.environments.base import _get_activity_callback - _cb = _get_activity_callback() - if _cb: - _elapsed = int(_now - exec_start) - _cb(f"execute_code running ({_elapsed}s elapsed)") - except Exception: - pass + try: + from tools.environments.base import touch_activity_if_due + touch_activity_if_due(_activity_state, "execute_code running") + except Exception: + pass time.sleep(0.2) # Wait for readers to finish draining diff --git a/tools/environments/base.py b/tools/environments/base.py index 19c3bf024..8e9907923 100644 --- a/tools/environments/base.py +++ b/tools/environments/base.py @@ -37,6 +37,32 @@ def _get_activity_callback() -> Callable[[str], None] | None: return getattr(_activity_callback_local, "callback", None) +def touch_activity_if_due( + state: dict, + label: str, +) -> None: + """Fire the activity callback at most once every ``state['interval']`` seconds. + + *state* must contain ``last_touch`` (monotonic timestamp) and ``start`` + (monotonic timestamp of the operation start). An optional ``interval`` + key overrides the default 10 s cadence. + + Swallows all exceptions so callers don't need their own try/except. + """ + now = time.monotonic() + interval = state.get("interval", 10.0) + if now - state["last_touch"] < interval: + return + state["last_touch"] = now + try: + cb = _get_activity_callback() + if cb: + elapsed = int(now - state["start"]) + cb(f"{label} ({elapsed}s elapsed)") + except Exception: + pass + + def get_sandbox_dir() -> Path: """Return the host-side root for all sandbox storage (Docker workspaces, Singularity overlays/SIF cache, etc.). @@ -405,8 +431,11 @@ class BaseEnvironment(ABC): drain_thread = threading.Thread(target=_drain, daemon=True) drain_thread.start() deadline = time.monotonic() + timeout - _last_activity_touch = time.monotonic() - _ACTIVITY_INTERVAL = 10.0 # seconds between activity touches + _now = time.monotonic() + _activity_state = { + "last_touch": _now, + "start": _now, + } while proc.poll() is None: if is_interrupted(): @@ -428,16 +457,7 @@ class BaseEnvironment(ABC): "returncode": 124, } # Periodic activity touch so the gateway knows we're alive - _now = time.monotonic() - if _now - _last_activity_touch >= _ACTIVITY_INTERVAL: - _last_activity_touch = _now - _cb = _get_activity_callback() - if _cb: - try: - _elapsed = int(_now - (deadline - timeout)) - _cb(f"terminal command running ({_elapsed}s elapsed)") - except Exception: - pass + touch_activity_if_due(_activity_state, "terminal command running") time.sleep(0.2) drain_thread.join(timeout=5) diff --git a/tools/environments/modal_utils.py b/tools/environments/modal_utils.py index 161aad261..4d68399e4 100644 --- a/tools/environments/modal_utils.py +++ b/tools/environments/modal_utils.py @@ -105,9 +105,11 @@ class BaseModalExecutionEnvironment(BaseEnvironment): if self._client_timeout_grace_seconds is not None: deadline = time.monotonic() + prepared.timeout + self._client_timeout_grace_seconds - _last_activity_touch = time.monotonic() - _modal_exec_start = time.monotonic() - _ACTIVITY_INTERVAL = 10.0 # match _wait_for_process cadence + _now = time.monotonic() + _activity_state = { + "last_touch": _now, + "start": _now, + } while True: if is_interrupted(): @@ -133,20 +135,11 @@ class BaseModalExecutionEnvironment(BaseEnvironment): return self._timeout_result_for_modal(prepared.timeout) # Periodic activity touch so the gateway knows we're alive - _now = time.monotonic() - if _now - _last_activity_touch >= _ACTIVITY_INTERVAL: - _last_activity_touch = _now - try: - from tools.environments.base import _get_activity_callback - _cb = _get_activity_callback() - except Exception: - _cb = None - if _cb: - try: - _elapsed = int(_now - _modal_exec_start) - _cb(f"modal command running ({_elapsed}s elapsed)") - except Exception: - pass + try: + from tools.environments.base import touch_activity_if_due + touch_activity_if_due(_activity_state, "modal command running") + except Exception: + pass time.sleep(self._poll_interval_seconds)