diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index cc3d64d1360..ce6820abca4 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -136,7 +136,34 @@ class GatewayStreamConsumer: if should_edit and self._accumulated: # Split overflow: if accumulated text exceeds the platform - # limit, finalize the current message and start a new one. + # limit, split into properly sized chunks. + if ( + len(self._accumulated) > _safe_limit + and self._message_id is None + ): + # No existing message to edit (first message or after a + # segment break). Use truncate_message — the same + # helper the non-streaming path uses — to split with + # proper word/code-fence boundaries and chunk + # indicators like "(1/2)". + chunks = self.adapter.truncate_message( + self._accumulated, _safe_limit + ) + for chunk in chunks: + await self._send_new_chunk(chunk, self._message_id) + self._accumulated = "" + self._last_sent_text = "" + self._last_edit_time = time.monotonic() + if got_done: + return + if got_segment_break: + self._message_id = None + self._fallback_final_send = False + self._fallback_prefix = "" + continue + + # Existing message: edit it with the first chunk, then + # start a new message for the overflow remainder. while ( len(self._accumulated) > _safe_limit and self._message_id is not None @@ -226,6 +253,34 @@ class GatewayStreamConsumer: # Strip trailing whitespace/newlines but preserve leading content return cleaned.rstrip() + async def _send_new_chunk(self, text: str, reply_to_id: Optional[str]) -> Optional[str]: + """Send a new message chunk, optionally threaded to a previous message. + + Returns the message_id so callers can thread subsequent chunks. + """ + text = self._clean_for_display(text) + if not text.strip(): + return reply_to_id + try: + meta = dict(self.metadata) if self.metadata else {} + result = await self.adapter.send( + chat_id=self.chat_id, + content=text, + reply_to=reply_to_id, + metadata=meta, + ) + if result.success and result.message_id: + self._message_id = str(result.message_id) + self._already_sent = True + self._last_sent_text = text + return str(result.message_id) + else: + self._edit_supported = False + return reply_to_id + except Exception as e: + logger.error("Stream send chunk error: %s", e) + return reply_to_id + def _visible_prefix(self) -> str: """Return the visible text already shown in the streamed message.""" prefix = self._last_sent_text or ""