From 2f3177adf46d125cd5a2e6613b14ab72938deb9e Mon Sep 17 00:00:00 2001 From: teknium1 <127238744+teknium1@users.noreply.github.com> Date: Sat, 20 Jun 2026 20:27:04 -0700 Subject: [PATCH] fix(compression): protect the summary call from mid-flight interrupts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Context compression is atomic, but a gateway interrupt (an incoming user message while the agent is busy) could abort the in-flight summary call. The Codex Responses aux stream polls the thread interrupt flag and raised InterruptedError unconditionally — so compression fell back to a degraded static 'summary unavailable' marker, losing the real handoff (#23975). Add a thread-local interrupt-protection flag (aux_interrupt_protection context manager) in auxiliary_client; the Codex stream's cancellation check honors it. The compressor wraps its summary call_llm in the context manager. Timeouts still fire (a hung call must die) and all other aux tasks (vision, web_extract, title_generation, …) stay interruptible. Re-entrant, so the main-model retry recursion is safe. Co-authored-by: konsisumer --- agent/auxiliary_client.py | 40 +++++++- agent/context_compressor.py | 10 +- .../test_compression_interrupt_protection.py | 95 +++++++++++++++++++ 3 files changed, 142 insertions(+), 3 deletions(-) create mode 100644 tests/agent/test_compression_interrupt_protection.py diff --git a/agent/auxiliary_client.py b/agent/auxiliary_client.py index 0af56a7473d..4bc9440df31 100644 --- a/agent/auxiliary_client.py +++ b/agent/auxiliary_client.py @@ -40,6 +40,7 @@ Payment / credit exhaustion fallback: their OpenRouter balance but has Codex OAuth or another provider available. """ +import contextlib import json import logging import os @@ -107,6 +108,39 @@ from utils import base_url_host_matches, base_url_hostname, env_float, model_for logger = logging.getLogger(__name__) +# ── Interrupt protection for atomic auxiliary tasks ────────────────────── +# Some auxiliary tasks must NOT be aborted mid-flight by a gateway interrupt +# (e.g. an incoming user message while the agent is busy). Context +# compression is the prime case: if the summary LLM call is interrupted +# part-way, compression falls back to a static "summary unavailable" marker +# and the real handoff is lost (#23975). A thread-local flag lets such a +# task mark its in-flight LLM call as interrupt-protected; the Codex +# Responses stream's cancellation check honors it. TIMEOUTS still fire +# (a hung call must die), and all OTHER aux tasks (vision, web_extract, +# title_generation, …) remain freely interruptible. +_aux_interrupt_protection = threading.local() + + +def _aux_interrupt_protected() -> bool: + return bool(getattr(_aux_interrupt_protection, "active", False)) + + +@contextlib.contextmanager +def aux_interrupt_protection(active: bool = True): + """Mark the current thread's auxiliary LLM call as interrupt-protected. + + Used by atomic aux tasks (compression) so a mid-flight gateway interrupt + doesn't abort the call and trigger a degraded fallback. Re-entrant-safe: + restores the previous value on exit. + """ + prev = getattr(_aux_interrupt_protection, "active", False) + _aux_interrupt_protection.active = active + try: + yield + finally: + _aux_interrupt_protection.active = prev + + def _safe_isinstance(obj: Any, maybe_type: Any) -> bool: """Return False instead of raising when a patched symbol is not a type.""" try: @@ -805,7 +839,11 @@ class _CodexCompletionsAdapter: raise TimeoutError(_timeout_message()) try: from tools.interrupt import is_interrupted - if is_interrupted(): + # Honor interrupt protection for atomic aux tasks (compression): + # a mid-flight gateway interrupt must NOT abort the summary call + # and trigger a degraded fallback marker (#23975). Timeouts above + # still fire; other aux tasks remain interruptible. + if is_interrupted() and not _aux_interrupt_protected(): raise InterruptedError("Codex auxiliary Responses stream interrupted") except InterruptedError: raise diff --git a/agent/context_compressor.py b/agent/context_compressor.py index 8d1bfebd5ff..88c0a61e922 100644 --- a/agent/context_compressor.py +++ b/agent/context_compressor.py @@ -23,7 +23,7 @@ import re import time from typing import Any, Dict, List, Optional -from agent.auxiliary_client import call_llm, _is_connection_error +from agent.auxiliary_client import call_llm, _is_connection_error, aux_interrupt_protection from agent.context_engine import ContextEngine from agent.model_metadata import ( MINIMUM_CONTEXT_LENGTH, @@ -1519,7 +1519,13 @@ This compaction should PRIORITISE preserving all information related to the focu } if self.summary_model: call_kwargs["model"] = self.summary_model - response = call_llm(**call_kwargs) + # Compression is atomic: protect the in-flight summary call from a + # mid-turn gateway interrupt. Without this, an incoming user message + # aborts the summary and compression falls back to a degraded static + # marker, losing the real handoff (#23975). Re-entrant: a main-model + # retry (_generate_summary recursion) re-enters harmlessly. + with aux_interrupt_protection(): + response = call_llm(**call_kwargs) content = response.choices[0].message.content # Handle cases where content is not a string (e.g., dict from llama.cpp) if not isinstance(content, str): diff --git a/tests/agent/test_compression_interrupt_protection.py b/tests/agent/test_compression_interrupt_protection.py new file mode 100644 index 00000000000..1a6a6921af9 --- /dev/null +++ b/tests/agent/test_compression_interrupt_protection.py @@ -0,0 +1,95 @@ +"""Regression for #23975: context compression must survive a mid-flight +gateway interrupt. + +While the compression summary LLM call is in flight, an incoming gateway +message sets the thread interrupt flag. The Codex Responses aux stream polls +that flag and used to raise InterruptedError unconditionally — aborting the +summary, which then fell back to a degraded static "summary unavailable" +marker (losing the real handoff). Compression now runs its summary call +under aux_interrupt_protection(), so the interrupt poll is masked for the +compression task only (timeouts and other aux tasks stay interruptible). +""" + +from __future__ import annotations + +from unittest.mock import patch + +import agent.auxiliary_client as aux + + +class TestAuxInterruptProtection: + def test_protected_flag_defaults_false(self): + # Fresh thread-local state. + assert aux._aux_interrupt_protected() is False + + def test_context_manager_sets_and_restores(self): + assert aux._aux_interrupt_protected() is False + with aux.aux_interrupt_protection(): + assert aux._aux_interrupt_protected() is True + assert aux._aux_interrupt_protected() is False + + def test_context_manager_is_reentrant(self): + with aux.aux_interrupt_protection(): + assert aux._aux_interrupt_protected() is True + with aux.aux_interrupt_protection(): + assert aux._aux_interrupt_protected() is True + # inner exit must NOT clear protection while still inside outer + assert aux._aux_interrupt_protected() is True + assert aux._aux_interrupt_protected() is False + + def test_restores_on_exception(self): + try: + with aux.aux_interrupt_protection(): + raise ValueError("boom") + except ValueError: + pass + assert aux._aux_interrupt_protected() is False + + def test_explicit_inactive_is_noop(self): + with aux.aux_interrupt_protection(active=False): + assert aux._aux_interrupt_protected() is False + + +class TestCompressionProtectsSummaryCall: + """The compressor must wrap its summary call_llm in aux_interrupt_protection + so a mid-flight interrupt doesn't abort it (#23975).""" + + def test_compressor_call_site_uses_protection(self): + # The summary call must run inside aux_interrupt_protection. We assert + # the protection flag is ACTIVE at the moment call_llm is invoked. + from agent.context_compressor import ContextCompressor + + seen = {} + + class _Resp: + class _Choice: + class _Msg: + content = "[CONTEXT SUMMARY]: ok" + message = _Msg() + choices = [_Choice()] + + def fake_call_llm(**kwargs): + # Capture whether protection was active during the call. + seen["protected"] = aux._aux_interrupt_protected() + seen["task"] = kwargs.get("task") + return _Resp() + + with patch("agent.context_compressor.get_model_context_length", return_value=100000): + c = ContextCompressor(model="test", quiet_mode=True) + + msgs = [ + {"role": "user", "content": "do a thing"}, + {"role": "assistant", "content": "working"}, + {"role": "user", "content": "more"}, + {"role": "assistant", "content": "done"}, + ] + with patch("agent.context_compressor.call_llm", side_effect=fake_call_llm): + summary = c._generate_summary(msgs) + + assert summary is not None + assert seen.get("task") == "compression" + assert seen.get("protected") is True, ( + "compression summary call must run under aux_interrupt_protection" + ) + # Protection must be cleared after the call returns. + assert aux._aux_interrupt_protected() is False