fix(gateway): roll over Telegram tool progress bubbles

This commit is contained in:
Maxim Esipov 2026-05-15 10:48:41 +03:00 committed by Teknium
parent 362ef912ea
commit f55c67ac1f
2 changed files with 207 additions and 4 deletions

View file

@ -15276,12 +15276,104 @@ class GatewayRunner:
break
return
progress_lines = [] # Accumulated tool lines
progress_msg_id = None # ID of the progress message to edit
progress_lines = [] # Accumulated tool lines for the CURRENT editable bubble
progress_msg_id = None # ID of the current progress message to edit
can_edit = True # False once an edit fails (platform doesn't support it)
_last_edit_ts = 0.0 # Throttle edits to avoid Telegram flood control
_PROGRESS_EDIT_INTERVAL = 1.5 # Minimum seconds between edits
_progress_len_fn = (
adapter.message_len_fn
if isinstance(adapter, BasePlatformAdapter)
else len
)
try:
_raw_progress_limit = int(getattr(adapter, "MAX_MESSAGE_LENGTH", 4000) or 4000)
except Exception:
_raw_progress_limit = 4000
# Leave a little room for platform quirks / formatting. For tiny
# test adapters keep the limit usable instead of clamping to 500+.
_PROGRESS_TEXT_LIMIT = max(
1,
_raw_progress_limit - (64 if _raw_progress_limit > 128 else 0),
)
def _progress_text(lines: list) -> str:
return "\n".join(str(line) for line in lines)
def _split_progress_groups(lines: list) -> list[list]:
"""Partition progress lines into platform-sized editable bubbles."""
groups: list[list] = []
current: list = []
for line in lines:
candidate = current + [line]
if current and _progress_len_fn(_progress_text(candidate)) > _PROGRESS_TEXT_LIMIT:
groups.append(current)
current = [line]
else:
current = candidate
if current:
groups.append(current)
return groups
def _track_progress_result(result) -> None:
if (
_cleanup_progress
and getattr(result, "success", False)
and getattr(result, "message_id", None)
):
_cleanup_msg_ids.append(str(result.message_id))
async def _send_progress_text(text: str):
result = await adapter.send(
chat_id=source.chat_id,
content=text,
reply_to=_progress_reply_to,
metadata=_progress_metadata,
)
_track_progress_result(result)
return result
async def _roll_progress_overflow_if_needed() -> bool:
"""Start fresh editable progress bubbles before a bubble exceeds limit.
Returns True when it delivered/split the current buffer and the
caller should skip the normal send/edit path for this tick.
"""
nonlocal progress_msg_id, progress_lines, can_edit
if not progress_lines or not can_edit:
return False
groups = _split_progress_groups(progress_lines)
if len(groups) <= 1:
return False
first_text = _progress_text(groups[0])
if progress_msg_id is not None:
result = await adapter.edit_message(
chat_id=source.chat_id,
message_id=progress_msg_id,
content=first_text,
)
if not result.success:
can_edit = False
# Fall back to the existing non-edit behavior below.
return False
else:
result = await _send_progress_text(first_text)
if result.success and result.message_id:
progress_msg_id = result.message_id
for group in groups[1:]:
result = await _send_progress_text(_progress_text(group))
if result.success and result.message_id:
progress_msg_id = result.message_id
# The newest continuation is now the only mutable bubble. Keep
# just its lines so subsequent edits update it instead of
# replaying the full historical transcript into new messages.
progress_lines = groups[-1]
return True
while True:
try:
if not _run_still_current():
@ -15334,6 +15426,13 @@ class GatewayRunner:
msg = raw
progress_lines.append(msg)
if await _roll_progress_overflow_if_needed():
_last_edit_ts = time.monotonic()
await asyncio.sleep(0.3)
if _run_still_current():
await adapter.send_typing(source.chat_id, metadata=_progress_metadata)
continue
# Throttle edits: batch rapid tool updates into fewer
# API calls to avoid hitting Telegram flood control.
# (grammY auto-retry pattern: proactively rate-limit
@ -15422,12 +15521,14 @@ class GatewayRunner:
_, base_msg, count = raw
if progress_lines:
progress_lines[-1] = f"{base_msg} (×{count + 1})"
await _roll_progress_overflow_if_needed()
elif isinstance(raw, tuple) and len(raw) >= 1 and raw[0] == "__reset__":
# Content-bubble marker during drain: close off
# the current progress bubble and start a fresh
# one for any tool lines that arrived after.
await _roll_progress_overflow_if_needed()
if can_edit and progress_lines and progress_msg_id:
_pending_text = "\n".join(progress_lines)
_pending_text = _progress_text(progress_lines)
try:
await adapter.edit_message(
chat_id=source.chat_id,
@ -15442,11 +15543,14 @@ class GatewayRunner:
repeat_count[0] = 0
else:
progress_lines.append(raw)
await _roll_progress_overflow_if_needed()
except Exception:
break
# Final edit with all remaining tools (only if editing works)
if can_edit and progress_lines and progress_msg_id:
full_text = "\n".join(progress_lines)
await _roll_progress_overflow_if_needed()
if can_edit and progress_lines and progress_msg_id:
full_text = _progress_text(progress_lines)
try:
await adapter.edit_message(
chat_id=source.chat_id,

View file

@ -58,6 +58,47 @@ class ProgressCaptureAdapter(BasePlatformAdapter):
return {"id": chat_id}
class SmallLimitProgressAdapter(ProgressCaptureAdapter):
"""Adapter with a tiny platform limit to exercise progress rollover."""
MAX_MESSAGE_LENGTH = 180
def __init__(self, platform=Platform.TELEGRAM):
super().__init__(platform=platform)
self._next_id = 0
self.oversized_edits = []
self.oversized_sends = []
def _mint_id(self):
self._next_id += 1
return f"progress-{self._next_id}"
async def send(self, chat_id, content, reply_to=None, metadata=None) -> SendResult:
if len(content) > self.MAX_MESSAGE_LENGTH:
self.oversized_sends.append(content)
self.sent.append(
{
"chat_id": chat_id,
"content": content,
"reply_to": reply_to,
"metadata": metadata,
}
)
return SendResult(success=True, message_id=self._mint_id())
async def edit_message(self, chat_id, message_id, content) -> SendResult:
if len(content) > self.MAX_MESSAGE_LENGTH:
self.oversized_edits.append(content)
self.edits.append(
{
"chat_id": chat_id,
"message_id": message_id,
"content": content,
}
)
return SendResult(success=True, message_id=message_id)
class NonEditingProgressCaptureAdapter(ProgressCaptureAdapter):
SUPPORTS_MESSAGE_EDITING = False
@ -123,6 +164,31 @@ class DelayedProgressAgent:
}
class ManyProgressLinesAgent:
"""Emits enough tool-progress lines to exceed a single platform bubble."""
def __init__(self, **kwargs):
self.tool_progress_callback = kwargs.get("tool_progress_callback")
self.tools = []
def run_conversation(self, message, conversation_history=None, task_id=None):
cb = self.tool_progress_callback
assert cb is not None
cb("tool.started", "terminal", "first-short", {})
# Let the progress task create the first editable bubble, then enqueue
# the rest quickly. The cancellation drain must roll them into fresh
# editable bubbles instead of trying to edit the first one past limit.
time.sleep(0.35)
for idx in range(1, 8):
cb("tool.started", "terminal", f"overflow-line-{idx}-" + "x" * 45, {})
time.sleep(0.1)
return {
"final_response": "done",
"messages": [],
"api_calls": 1,
}
class DelayedInterimAgent:
def __init__(self, **kwargs):
self.interim_assistant_callback = kwargs.get("interim_assistant_callback")
@ -617,6 +683,39 @@ async def _run_with_agent(
return adapter, result
@pytest.mark.asyncio
async def test_run_agent_rolls_progress_bubble_before_platform_limit(monkeypatch, tmp_path):
"""Tool progress should start a second editable bubble before Telegram's limit.
Regression: once the first progress bubble grew past the platform limit,
the gateway kept trying to edit that same oversized full transcript. The
Telegram adapter then split-and-sent a fresh continuation on every update,
causing a noisy trail of one-line messages instead of a new editable bubble.
"""
adapter, result = await _run_with_agent(
monkeypatch,
tmp_path,
ManyProgressLinesAgent,
session_id="sess-progress-overflow-rollover",
config_data={
"display": {
"tool_progress": "all",
"interim_assistant_messages": False,
"tool_preview_length": 60,
}
},
adapter_cls=SmallLimitProgressAdapter,
)
assert result["final_response"] == "done"
assert isinstance(adapter, SmallLimitProgressAdapter)
assert len(adapter.sent) >= 2, "expected a fresh progress bubble after the first filled"
assert adapter.oversized_sends == []
assert adapter.oversized_edits == []
all_bubbles = [call["content"] for call in adapter.sent + adapter.edits]
assert all(len(text) <= adapter.MAX_MESSAGE_LENGTH for text in all_bubbles)
@pytest.mark.asyncio
async def test_run_agent_surfaces_real_interim_commentary(monkeypatch, tmp_path):
adapter, result = await _run_with_agent(