fix: /stop now immediately aborts streaming retry loop

When a user sends /stop during a streaming API call, the outer poll loop
detects _interrupt_requested and closes the HTTP connection. However, the
inner _call() thread catches the connection error and enters its retry
loop — opening a FRESH connection without checking the interrupt flag.

On slow providers like ollama-cloud, each retry attempt blocks for the
full stream-read timeout (120s+). With 3 retry attempts this caused
510+ second delays between /stop and actual response — the agent appeared
completely unresponsive despite the stop being acknowledged.

Fix: add an _interrupt_requested check at the top of the streaming retry
loop so the agent exits immediately instead of retrying.

Also fix log truncation: all session key logging in gateway/run.py used
[:20] or [:30] slices, which truncated 'agent:main:telegram:dm:5690190437'
(33 chars) to 'agent:main:telegram:' — losing the identifying chat type
and user ID. Replace with full keys to make logs debuggable.

Reported by user Sidharth Pulipaka via Telegram on ollama-cloud provider.
This commit is contained in:
kshitijk4poor 2026-04-25 17:44:32 +05:30 committed by Teknium
parent 5006b2204b
commit 7c17accb29
3 changed files with 201 additions and 29 deletions

View file

@ -0,0 +1,162 @@
"""Tests that /stop interrupts streaming retry loops immediately.
When the agent is interrupted during a streaming API call, the outer poll
loop closes the HTTP connection. The inner `_call()` thread sees a
connection error and enters its retry loop. Before this fix, the retry
loop would open a FRESH connection without checking `_interrupt_requested`,
making /stop take multiple retry cycles × read-timeout to actually stop
(510+ seconds observed on slow ollama-cloud providers).
The fix adds an `_interrupt_requested` check at the top of the retry loop
so the agent exits immediately instead of retrying.
"""
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
import pytest
def _make_agent(**kwargs):
"""Create a minimal AIAgent for streaming tests."""
from run_agent import AIAgent
defaults = dict(
api_key="test-key",
base_url="https://example.com/v1",
model="test/model",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
defaults.update(kwargs)
agent = AIAgent(**defaults)
agent.api_mode = "chat_completions"
return agent
class TestStreamInterruptBeforeRetry:
"""Verify _interrupt_requested is checked before each streaming retry."""
@pytest.mark.filterwarnings(
"ignore::pytest.PytestUnhandledThreadExceptionWarning"
)
@patch("run_agent.AIAgent._create_request_openai_client")
@patch("run_agent.AIAgent._close_request_openai_client")
def test_interrupt_prevents_stream_retry(self, mock_close, mock_create):
"""When _interrupt_requested is set during a transient stream error,
the retry loop must NOT retry it should raise InterruptedError
immediately instead of opening a fresh connection."""
import httpx
attempt_count = [0]
def fail_once_then_interrupt(*args, **kwargs):
attempt_count[0] += 1
if attempt_count[0] == 1:
# First attempt: simulate normal failure, then set interrupt
# (as if /stop arrived while the retry loop processes the error)
agent._interrupt_requested = True
raise httpx.ConnectError("connection reset by /stop")
# Should never reach here — the interrupt check should fire first
raise httpx.ConnectError("unexpected retry — interrupt not checked!")
mock_client = MagicMock()
mock_client.chat.completions.create.side_effect = fail_once_then_interrupt
mock_create.return_value = mock_client
agent = _make_agent()
agent._interrupt_requested = False
with pytest.raises(InterruptedError, match="interrupted"):
agent._interruptible_streaming_api_call({})
# Only 1 attempt should have been made — the interrupt should prevent retry
assert attempt_count[0] == 1, (
f"Expected 1 attempt but got {attempt_count[0]}. "
"The retry loop retried despite _interrupt_requested being set."
)
@pytest.mark.filterwarnings(
"ignore::pytest.PytestUnhandledThreadExceptionWarning"
)
@patch("run_agent.AIAgent._create_request_openai_client")
@patch("run_agent.AIAgent._close_request_openai_client")
def test_interrupt_before_first_attempt(self, mock_close, mock_create):
"""If _interrupt_requested is already set when the streaming call
starts, it should exit immediately without making any API call."""
mock_client = MagicMock()
mock_create.return_value = mock_client
agent = _make_agent()
agent._interrupt_requested = True # Pre-set before call
with pytest.raises(InterruptedError, match="interrupted"):
agent._interruptible_streaming_api_call({})
# No API call should have been made at all
assert mock_client.chat.completions.create.call_count == 0
@patch("run_agent.AIAgent._create_request_openai_client")
@patch("run_agent.AIAgent._close_request_openai_client")
def test_normal_retry_still_works_without_interrupt(self, mock_close, mock_create):
"""Without an interrupt, transient errors should still retry normally."""
import httpx
attempts = [0]
def fail_twice_then_succeed(*args, **kwargs):
attempts[0] += 1
if attempts[0] <= 2:
raise httpx.ConnectError("transient failure")
# Third attempt succeeds
chunks = [
SimpleNamespace(
choices=[
SimpleNamespace(
index=0,
delta=SimpleNamespace(
content="ok",
tool_calls=None,
reasoning_content=None,
reasoning=None,
),
finish_reason=None,
)
],
model="test/model",
usage=None,
),
SimpleNamespace(
choices=[
SimpleNamespace(
index=0,
delta=SimpleNamespace(
content=None,
tool_calls=None,
reasoning_content=None,
reasoning=None,
),
finish_reason="stop",
)
],
model="test/model",
usage=None,
),
]
stream = MagicMock()
stream.__iter__ = MagicMock(return_value=iter(chunks))
stream.response = MagicMock()
stream.response.headers = {}
return stream
mock_client = MagicMock()
mock_client.chat.completions.create.side_effect = fail_twice_then_succeed
mock_create.return_value = mock_client
agent = _make_agent()
agent._interrupt_requested = False
# Should succeed on the third attempt
result = agent._interruptible_streaming_api_call({})
assert result is not None
assert attempts[0] == 3