mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-23 10:42:00 +00:00
fix(compression): protect the summary call from mid-flight interrupts
Context compression is atomic, but a gateway interrupt (an incoming user message while the agent is busy) could abort the in-flight summary call. The Codex Responses aux stream polls the thread interrupt flag and raised InterruptedError unconditionally — so compression fell back to a degraded static 'summary unavailable' marker, losing the real handoff (#23975). Add a thread-local interrupt-protection flag (aux_interrupt_protection context manager) in auxiliary_client; the Codex stream's cancellation check honors it. The compressor wraps its summary call_llm in the context manager. Timeouts still fire (a hung call must die) and all other aux tasks (vision, web_extract, title_generation, …) stay interruptible. Re-entrant, so the main-model retry recursion is safe. Co-authored-by: konsisumer <der@konsi.org>
This commit is contained in:
parent
4b7f9a4d30
commit
2f3177adf4
3 changed files with 142 additions and 3 deletions
|
|
@ -40,6 +40,7 @@ Payment / credit exhaustion fallback:
|
|||
their OpenRouter balance but has Codex OAuth or another provider available.
|
||||
"""
|
||||
|
||||
import contextlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
|
|
@ -107,6 +108,39 @@ from utils import base_url_host_matches, base_url_hostname, env_float, model_for
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ── Interrupt protection for atomic auxiliary tasks ──────────────────────
|
||||
# Some auxiliary tasks must NOT be aborted mid-flight by a gateway interrupt
|
||||
# (e.g. an incoming user message while the agent is busy). Context
|
||||
# compression is the prime case: if the summary LLM call is interrupted
|
||||
# part-way, compression falls back to a static "summary unavailable" marker
|
||||
# and the real handoff is lost (#23975). A thread-local flag lets such a
|
||||
# task mark its in-flight LLM call as interrupt-protected; the Codex
|
||||
# Responses stream's cancellation check honors it. TIMEOUTS still fire
|
||||
# (a hung call must die), and all OTHER aux tasks (vision, web_extract,
|
||||
# title_generation, …) remain freely interruptible.
|
||||
_aux_interrupt_protection = threading.local()
|
||||
|
||||
|
||||
def _aux_interrupt_protected() -> bool:
|
||||
return bool(getattr(_aux_interrupt_protection, "active", False))
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def aux_interrupt_protection(active: bool = True):
|
||||
"""Mark the current thread's auxiliary LLM call as interrupt-protected.
|
||||
|
||||
Used by atomic aux tasks (compression) so a mid-flight gateway interrupt
|
||||
doesn't abort the call and trigger a degraded fallback. Re-entrant-safe:
|
||||
restores the previous value on exit.
|
||||
"""
|
||||
prev = getattr(_aux_interrupt_protection, "active", False)
|
||||
_aux_interrupt_protection.active = active
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
_aux_interrupt_protection.active = prev
|
||||
|
||||
|
||||
def _safe_isinstance(obj: Any, maybe_type: Any) -> bool:
|
||||
"""Return False instead of raising when a patched symbol is not a type."""
|
||||
try:
|
||||
|
|
@ -805,7 +839,11 @@ class _CodexCompletionsAdapter:
|
|||
raise TimeoutError(_timeout_message())
|
||||
try:
|
||||
from tools.interrupt import is_interrupted
|
||||
if is_interrupted():
|
||||
# Honor interrupt protection for atomic aux tasks (compression):
|
||||
# a mid-flight gateway interrupt must NOT abort the summary call
|
||||
# and trigger a degraded fallback marker (#23975). Timeouts above
|
||||
# still fire; other aux tasks remain interruptible.
|
||||
if is_interrupted() and not _aux_interrupt_protected():
|
||||
raise InterruptedError("Codex auxiliary Responses stream interrupted")
|
||||
except InterruptedError:
|
||||
raise
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ import re
|
|||
import time
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from agent.auxiliary_client import call_llm, _is_connection_error
|
||||
from agent.auxiliary_client import call_llm, _is_connection_error, aux_interrupt_protection
|
||||
from agent.context_engine import ContextEngine
|
||||
from agent.model_metadata import (
|
||||
MINIMUM_CONTEXT_LENGTH,
|
||||
|
|
@ -1519,7 +1519,13 @@ This compaction should PRIORITISE preserving all information related to the focu
|
|||
}
|
||||
if self.summary_model:
|
||||
call_kwargs["model"] = self.summary_model
|
||||
response = call_llm(**call_kwargs)
|
||||
# Compression is atomic: protect the in-flight summary call from a
|
||||
# mid-turn gateway interrupt. Without this, an incoming user message
|
||||
# aborts the summary and compression falls back to a degraded static
|
||||
# marker, losing the real handoff (#23975). Re-entrant: a main-model
|
||||
# retry (_generate_summary recursion) re-enters harmlessly.
|
||||
with aux_interrupt_protection():
|
||||
response = call_llm(**call_kwargs)
|
||||
content = response.choices[0].message.content
|
||||
# Handle cases where content is not a string (e.g., dict from llama.cpp)
|
||||
if not isinstance(content, str):
|
||||
|
|
|
|||
95
tests/agent/test_compression_interrupt_protection.py
Normal file
95
tests/agent/test_compression_interrupt_protection.py
Normal file
|
|
@ -0,0 +1,95 @@
|
|||
"""Regression for #23975: context compression must survive a mid-flight
|
||||
gateway interrupt.
|
||||
|
||||
While the compression summary LLM call is in flight, an incoming gateway
|
||||
message sets the thread interrupt flag. The Codex Responses aux stream polls
|
||||
that flag and used to raise InterruptedError unconditionally — aborting the
|
||||
summary, which then fell back to a degraded static "summary unavailable"
|
||||
marker (losing the real handoff). Compression now runs its summary call
|
||||
under aux_interrupt_protection(), so the interrupt poll is masked for the
|
||||
compression task only (timeouts and other aux tasks stay interruptible).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
import agent.auxiliary_client as aux
|
||||
|
||||
|
||||
class TestAuxInterruptProtection:
|
||||
def test_protected_flag_defaults_false(self):
|
||||
# Fresh thread-local state.
|
||||
assert aux._aux_interrupt_protected() is False
|
||||
|
||||
def test_context_manager_sets_and_restores(self):
|
||||
assert aux._aux_interrupt_protected() is False
|
||||
with aux.aux_interrupt_protection():
|
||||
assert aux._aux_interrupt_protected() is True
|
||||
assert aux._aux_interrupt_protected() is False
|
||||
|
||||
def test_context_manager_is_reentrant(self):
|
||||
with aux.aux_interrupt_protection():
|
||||
assert aux._aux_interrupt_protected() is True
|
||||
with aux.aux_interrupt_protection():
|
||||
assert aux._aux_interrupt_protected() is True
|
||||
# inner exit must NOT clear protection while still inside outer
|
||||
assert aux._aux_interrupt_protected() is True
|
||||
assert aux._aux_interrupt_protected() is False
|
||||
|
||||
def test_restores_on_exception(self):
|
||||
try:
|
||||
with aux.aux_interrupt_protection():
|
||||
raise ValueError("boom")
|
||||
except ValueError:
|
||||
pass
|
||||
assert aux._aux_interrupt_protected() is False
|
||||
|
||||
def test_explicit_inactive_is_noop(self):
|
||||
with aux.aux_interrupt_protection(active=False):
|
||||
assert aux._aux_interrupt_protected() is False
|
||||
|
||||
|
||||
class TestCompressionProtectsSummaryCall:
|
||||
"""The compressor must wrap its summary call_llm in aux_interrupt_protection
|
||||
so a mid-flight interrupt doesn't abort it (#23975)."""
|
||||
|
||||
def test_compressor_call_site_uses_protection(self):
|
||||
# The summary call must run inside aux_interrupt_protection. We assert
|
||||
# the protection flag is ACTIVE at the moment call_llm is invoked.
|
||||
from agent.context_compressor import ContextCompressor
|
||||
|
||||
seen = {}
|
||||
|
||||
class _Resp:
|
||||
class _Choice:
|
||||
class _Msg:
|
||||
content = "[CONTEXT SUMMARY]: ok"
|
||||
message = _Msg()
|
||||
choices = [_Choice()]
|
||||
|
||||
def fake_call_llm(**kwargs):
|
||||
# Capture whether protection was active during the call.
|
||||
seen["protected"] = aux._aux_interrupt_protected()
|
||||
seen["task"] = kwargs.get("task")
|
||||
return _Resp()
|
||||
|
||||
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
||||
c = ContextCompressor(model="test", quiet_mode=True)
|
||||
|
||||
msgs = [
|
||||
{"role": "user", "content": "do a thing"},
|
||||
{"role": "assistant", "content": "working"},
|
||||
{"role": "user", "content": "more"},
|
||||
{"role": "assistant", "content": "done"},
|
||||
]
|
||||
with patch("agent.context_compressor.call_llm", side_effect=fake_call_llm):
|
||||
summary = c._generate_summary(msgs)
|
||||
|
||||
assert summary is not None
|
||||
assert seen.get("task") == "compression"
|
||||
assert seen.get("protected") is True, (
|
||||
"compression summary call must run under aux_interrupt_protection"
|
||||
)
|
||||
# Protection must be cleared after the call returns.
|
||||
assert aux._aux_interrupt_protected() is False
|
||||
Loading…
Add table
Add a link
Reference in a new issue