From f55c67ac1f632fa966bf02b88e15ec6d25051bc1 Mon Sep 17 00:00:00 2001 From: Maxim Esipov Date: Fri, 15 May 2026 10:48:41 +0300 Subject: [PATCH] fix(gateway): roll over Telegram tool progress bubbles --- gateway/run.py | 112 +++++++++++++++++++++- tests/gateway/test_run_progress_topics.py | 99 +++++++++++++++++++ 2 files changed, 207 insertions(+), 4 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index 1941c4aab82..4ac739a7458 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -15276,12 +15276,104 @@ class GatewayRunner: break return - progress_lines = [] # Accumulated tool lines - progress_msg_id = None # ID of the progress message to edit + progress_lines = [] # Accumulated tool lines for the CURRENT editable bubble + progress_msg_id = None # ID of the current progress message to edit can_edit = True # False once an edit fails (platform doesn't support it) _last_edit_ts = 0.0 # Throttle edits to avoid Telegram flood control _PROGRESS_EDIT_INTERVAL = 1.5 # Minimum seconds between edits + _progress_len_fn = ( + adapter.message_len_fn + if isinstance(adapter, BasePlatformAdapter) + else len + ) + try: + _raw_progress_limit = int(getattr(adapter, "MAX_MESSAGE_LENGTH", 4000) or 4000) + except Exception: + _raw_progress_limit = 4000 + # Leave a little room for platform quirks / formatting. For tiny + # test adapters keep the limit usable instead of clamping to 500+. + _PROGRESS_TEXT_LIMIT = max( + 1, + _raw_progress_limit - (64 if _raw_progress_limit > 128 else 0), + ) + + def _progress_text(lines: list) -> str: + return "\n".join(str(line) for line in lines) + + def _split_progress_groups(lines: list) -> list[list]: + """Partition progress lines into platform-sized editable bubbles.""" + groups: list[list] = [] + current: list = [] + for line in lines: + candidate = current + [line] + if current and _progress_len_fn(_progress_text(candidate)) > _PROGRESS_TEXT_LIMIT: + groups.append(current) + current = [line] + else: + current = candidate + if current: + groups.append(current) + return groups + + def _track_progress_result(result) -> None: + if ( + _cleanup_progress + and getattr(result, "success", False) + and getattr(result, "message_id", None) + ): + _cleanup_msg_ids.append(str(result.message_id)) + + async def _send_progress_text(text: str): + result = await adapter.send( + chat_id=source.chat_id, + content=text, + reply_to=_progress_reply_to, + metadata=_progress_metadata, + ) + _track_progress_result(result) + return result + + async def _roll_progress_overflow_if_needed() -> bool: + """Start fresh editable progress bubbles before a bubble exceeds limit. + + Returns True when it delivered/split the current buffer and the + caller should skip the normal send/edit path for this tick. + """ + nonlocal progress_msg_id, progress_lines, can_edit + if not progress_lines or not can_edit: + return False + groups = _split_progress_groups(progress_lines) + if len(groups) <= 1: + return False + + first_text = _progress_text(groups[0]) + if progress_msg_id is not None: + result = await adapter.edit_message( + chat_id=source.chat_id, + message_id=progress_msg_id, + content=first_text, + ) + if not result.success: + can_edit = False + # Fall back to the existing non-edit behavior below. + return False + else: + result = await _send_progress_text(first_text) + if result.success and result.message_id: + progress_msg_id = result.message_id + + for group in groups[1:]: + result = await _send_progress_text(_progress_text(group)) + if result.success and result.message_id: + progress_msg_id = result.message_id + + # The newest continuation is now the only mutable bubble. Keep + # just its lines so subsequent edits update it instead of + # replaying the full historical transcript into new messages. + progress_lines = groups[-1] + return True + while True: try: if not _run_still_current(): @@ -15334,6 +15426,13 @@ class GatewayRunner: msg = raw progress_lines.append(msg) + if await _roll_progress_overflow_if_needed(): + _last_edit_ts = time.monotonic() + await asyncio.sleep(0.3) + if _run_still_current(): + await adapter.send_typing(source.chat_id, metadata=_progress_metadata) + continue + # Throttle edits: batch rapid tool updates into fewer # API calls to avoid hitting Telegram flood control. # (grammY auto-retry pattern: proactively rate-limit @@ -15422,12 +15521,14 @@ class GatewayRunner: _, base_msg, count = raw if progress_lines: progress_lines[-1] = f"{base_msg} (×{count + 1})" + await _roll_progress_overflow_if_needed() elif isinstance(raw, tuple) and len(raw) >= 1 and raw[0] == "__reset__": # Content-bubble marker during drain: close off # the current progress bubble and start a fresh # one for any tool lines that arrived after. + await _roll_progress_overflow_if_needed() if can_edit and progress_lines and progress_msg_id: - _pending_text = "\n".join(progress_lines) + _pending_text = _progress_text(progress_lines) try: await adapter.edit_message( chat_id=source.chat_id, @@ -15442,11 +15543,14 @@ class GatewayRunner: repeat_count[0] = 0 else: progress_lines.append(raw) + await _roll_progress_overflow_if_needed() except Exception: break # Final edit with all remaining tools (only if editing works) if can_edit and progress_lines and progress_msg_id: - full_text = "\n".join(progress_lines) + await _roll_progress_overflow_if_needed() + if can_edit and progress_lines and progress_msg_id: + full_text = _progress_text(progress_lines) try: await adapter.edit_message( chat_id=source.chat_id, diff --git a/tests/gateway/test_run_progress_topics.py b/tests/gateway/test_run_progress_topics.py index fb52e1e5863..b16be006eb0 100644 --- a/tests/gateway/test_run_progress_topics.py +++ b/tests/gateway/test_run_progress_topics.py @@ -58,6 +58,47 @@ class ProgressCaptureAdapter(BasePlatformAdapter): return {"id": chat_id} +class SmallLimitProgressAdapter(ProgressCaptureAdapter): + """Adapter with a tiny platform limit to exercise progress rollover.""" + + MAX_MESSAGE_LENGTH = 180 + + def __init__(self, platform=Platform.TELEGRAM): + super().__init__(platform=platform) + self._next_id = 0 + self.oversized_edits = [] + self.oversized_sends = [] + + def _mint_id(self): + self._next_id += 1 + return f"progress-{self._next_id}" + + async def send(self, chat_id, content, reply_to=None, metadata=None) -> SendResult: + if len(content) > self.MAX_MESSAGE_LENGTH: + self.oversized_sends.append(content) + self.sent.append( + { + "chat_id": chat_id, + "content": content, + "reply_to": reply_to, + "metadata": metadata, + } + ) + return SendResult(success=True, message_id=self._mint_id()) + + async def edit_message(self, chat_id, message_id, content) -> SendResult: + if len(content) > self.MAX_MESSAGE_LENGTH: + self.oversized_edits.append(content) + self.edits.append( + { + "chat_id": chat_id, + "message_id": message_id, + "content": content, + } + ) + return SendResult(success=True, message_id=message_id) + + class NonEditingProgressCaptureAdapter(ProgressCaptureAdapter): SUPPORTS_MESSAGE_EDITING = False @@ -123,6 +164,31 @@ class DelayedProgressAgent: } +class ManyProgressLinesAgent: + """Emits enough tool-progress lines to exceed a single platform bubble.""" + + def __init__(self, **kwargs): + self.tool_progress_callback = kwargs.get("tool_progress_callback") + self.tools = [] + + def run_conversation(self, message, conversation_history=None, task_id=None): + cb = self.tool_progress_callback + assert cb is not None + cb("tool.started", "terminal", "first-short", {}) + # Let the progress task create the first editable bubble, then enqueue + # the rest quickly. The cancellation drain must roll them into fresh + # editable bubbles instead of trying to edit the first one past limit. + time.sleep(0.35) + for idx in range(1, 8): + cb("tool.started", "terminal", f"overflow-line-{idx}-" + "x" * 45, {}) + time.sleep(0.1) + return { + "final_response": "done", + "messages": [], + "api_calls": 1, + } + + class DelayedInterimAgent: def __init__(self, **kwargs): self.interim_assistant_callback = kwargs.get("interim_assistant_callback") @@ -617,6 +683,39 @@ async def _run_with_agent( return adapter, result +@pytest.mark.asyncio +async def test_run_agent_rolls_progress_bubble_before_platform_limit(monkeypatch, tmp_path): + """Tool progress should start a second editable bubble before Telegram's limit. + + Regression: once the first progress bubble grew past the platform limit, + the gateway kept trying to edit that same oversized full transcript. The + Telegram adapter then split-and-sent a fresh continuation on every update, + causing a noisy trail of one-line messages instead of a new editable bubble. + """ + adapter, result = await _run_with_agent( + monkeypatch, + tmp_path, + ManyProgressLinesAgent, + session_id="sess-progress-overflow-rollover", + config_data={ + "display": { + "tool_progress": "all", + "interim_assistant_messages": False, + "tool_preview_length": 60, + } + }, + adapter_cls=SmallLimitProgressAdapter, + ) + + assert result["final_response"] == "done" + assert isinstance(adapter, SmallLimitProgressAdapter) + assert len(adapter.sent) >= 2, "expected a fresh progress bubble after the first filled" + assert adapter.oversized_sends == [] + assert adapter.oversized_edits == [] + all_bubbles = [call["content"] for call in adapter.sent + adapter.edits] + assert all(len(text) <= adapter.MAX_MESSAGE_LENGTH for text in all_bubbles) + + @pytest.mark.asyncio async def test_run_agent_surfaces_real_interim_commentary(monkeypatch, tmp_path): adapter, result = await _run_with_agent(