diff --git a/gateway/config.py b/gateway/config.py index bde52eb559..d2dc45eaec 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -190,7 +190,7 @@ class StreamingConfig: """Configuration for real-time token streaming to messaging platforms.""" enabled: bool = False transport: str = "edit" # "edit" (progressive editMessageText) or "off" - edit_interval: float = 0.3 # Seconds between message edits + edit_interval: float = 1.0 # Seconds between message edits (Telegram rate-limits at ~1/s) buffer_threshold: int = 40 # Chars before forcing an edit cursor: str = " ▉" # Cursor shown during streaming @@ -210,7 +210,7 @@ class StreamingConfig: return cls( enabled=data.get("enabled", False), transport=data.get("transport", "edit"), - edit_interval=float(data.get("edit_interval", 0.3)), + edit_interval=float(data.get("edit_interval", 1.0)), buffer_threshold=int(data.get("buffer_threshold", 40)), cursor=data.get("cursor", " ▉"), ) diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index 5453df60e8..de0a1453b9 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -36,7 +36,7 @@ _NEW_SEGMENT = object() @dataclass class StreamConsumerConfig: """Runtime config for a single stream consumer instance.""" - edit_interval: float = 0.3 + edit_interval: float = 1.0 buffer_threshold: int = 40 cursor: str = " ▉" @@ -56,6 +56,10 @@ class GatewayStreamConsumer: await task # wait for final edit """ + # After this many consecutive flood-control failures, permanently disable + # progressive edits for the remainder of the stream. + _MAX_FLOOD_STRIKES = 3 + def __init__( self, adapter: Any, @@ -76,6 +80,8 @@ class GatewayStreamConsumer: self._last_sent_text = "" # Track last-sent text to skip redundant edits self._fallback_final_send = False self._fallback_prefix = "" + self._flood_strikes = 0 # Consecutive flood-control edit failures + self._current_edit_interval = self.cfg.edit_interval # Adaptive backoff @property def already_sent(self) -> bool: @@ -129,7 +135,7 @@ class GatewayStreamConsumer: should_edit = ( got_done or got_segment_break - or (elapsed >= self.cfg.edit_interval + or (elapsed >= self._current_edit_interval and self._accumulated) or len(self._accumulated) >= self.cfg.buffer_threshold ) @@ -173,12 +179,13 @@ class GatewayStreamConsumer: if split_at < _safe_limit // 2: split_at = _safe_limit chunk = self._accumulated[:split_at] - await self._send_or_edit(chunk) - if self._fallback_final_send: - # Edit failed while attempting to split an oversized - # message. Keep the full accumulated text intact so - # the fallback final-send path can deliver the - # remaining continuation without dropping content. + ok = await self._send_or_edit(chunk) + if self._fallback_final_send or not ok: + # Edit failed (or backed off due to flood control) + # while attempting to split an oversized message. + # Keep the full accumulated text intact so the + # fallback final-send path can deliver the remaining + # continuation without dropping content. break self._accumulated = self._accumulated[split_at:].lstrip("\n") self._message_id = None @@ -322,7 +329,10 @@ class GatewayStreamConsumer: return chunks async def _send_fallback_final(self, text: str) -> None: - """Send the final continuation after streaming edits stop working.""" + """Send the final continuation after streaming edits stop working. + + Retries each chunk once on flood-control failures with a short delay. + """ final_text = self._clean_for_display(text) continuation = self._continuation_text(final_text) self._fallback_final_send = False @@ -339,12 +349,25 @@ class GatewayStreamConsumer: last_successful_chunk = "" sent_any_chunk = False for chunk in chunks: - result = await self.adapter.send( - chat_id=self.chat_id, - content=chunk, - metadata=self.metadata, - ) - if not result.success: + # Try sending with one retry on flood-control errors. + result = None + for attempt in range(2): + result = await self.adapter.send( + chat_id=self.chat_id, + content=chunk, + metadata=self.metadata, + ) + if result.success: + break + if attempt == 0 and self._is_flood_error(result): + logger.debug( + "Flood control on fallback send, retrying in 3s" + ) + await asyncio.sleep(3.0) + else: + break # non-flood error or second attempt failed + + if not result or not result.success: if sent_any_chunk: # Some continuation text already reached the user. Suppress # the base gateway final-send path so we don't resend the @@ -370,20 +393,52 @@ class GatewayStreamConsumer: self._last_sent_text = chunks[-1] self._fallback_prefix = "" - async def _send_or_edit(self, text: str) -> None: - """Send or edit the streaming message.""" + def _is_flood_error(self, result) -> bool: + """Check if a SendResult failure is due to flood control / rate limiting.""" + err = getattr(result, "error", "") or "" + err_lower = err.lower() + return "flood" in err_lower or "retry after" in err_lower or "rate" in err_lower + + async def _try_strip_cursor(self) -> None: + """Best-effort edit to remove the cursor from the last visible message. + + Called when entering fallback mode so the user doesn't see a stuck + cursor (▉) in the partial message. + """ + if not self._message_id or self._message_id == "__no_edit__": + return + prefix = self._visible_prefix() + if not prefix or not prefix.strip(): + return + try: + await self.adapter.edit_message( + chat_id=self.chat_id, + message_id=self._message_id, + content=prefix, + ) + self._last_sent_text = prefix + except Exception: + pass # best-effort — don't let this block the fallback path + + async def _send_or_edit(self, text: str) -> bool: + """Send or edit the streaming message. + + Returns True if the text was successfully delivered (sent or edited), + False otherwise. Callers like the overflow split loop use this to + decide whether to advance past the delivered chunk. + """ # Strip MEDIA: directives so they don't appear as visible text. # Media files are delivered as native attachments after the stream # finishes (via _deliver_media_from_response in gateway/run.py). text = self._clean_for_display(text) if not text.strip(): - return + return True # nothing to send is "success" try: if self._message_id is not None: if self._edit_supported: # Skip if text is identical to what we last sent if text == self._last_sent_text: - return + return True # Edit existing message result = await self.adapter.edit_message( chat_id=self.chat_id, @@ -393,19 +448,52 @@ class GatewayStreamConsumer: if result.success: self._already_sent = True self._last_sent_text = text + # Successful edit — reset flood strike counter + self._flood_strikes = 0 + return True else: - # If an edit fails mid-stream (especially Telegram flood control), - # stop progressive edits and send only the missing tail once the + # Edit failed. If this looks like flood control / rate + # limiting, use adaptive backoff: double the edit interval + # and retry on the next cycle. Only permanently disable + # edits after _MAX_FLOOD_STRIKES consecutive failures. + if self._is_flood_error(result): + self._flood_strikes += 1 + self._current_edit_interval = min( + self._current_edit_interval * 2, 10.0, + ) + logger.debug( + "Flood control on edit (strike %d/%d), " + "backoff interval → %.1fs", + self._flood_strikes, + self._MAX_FLOOD_STRIKES, + self._current_edit_interval, + ) + if self._flood_strikes < self._MAX_FLOOD_STRIKES: + # Don't disable edits yet — just slow down. + # Update _last_edit_time so the next edit + # respects the new interval. + self._last_edit_time = time.monotonic() + return False + + # Non-flood error OR flood strikes exhausted: enter + # fallback mode — send only the missing tail once the # final response is available. - logger.debug("Edit failed, disabling streaming for this adapter") + logger.debug( + "Edit failed (strikes=%d), entering fallback mode", + self._flood_strikes, + ) self._fallback_prefix = self._visible_prefix() self._fallback_final_send = True self._edit_supported = False self._already_sent = True + # Best-effort: strip the cursor from the last visible + # message so the user doesn't see a stuck ▉. + await self._try_strip_cursor() + return False else: # Editing not supported — skip intermediate updates. # The final response will be sent by the fallback path. - pass + return False else: # First message — send new result = await self.adapter.send( @@ -417,6 +505,7 @@ class GatewayStreamConsumer: self._message_id = result.message_id self._already_sent = True self._last_sent_text = text + return True elif result.success: # Platform accepted the message but returned no message_id # (e.g. Signal). Can't edit without an ID — switch to @@ -428,8 +517,11 @@ class GatewayStreamConsumer: self._fallback_final_send = True # Sentinel prevents re-entering this branch on every delta self._message_id = "__no_edit__" + return True # platform accepted, just can't edit else: # Initial send failed — disable streaming for this session self._edit_supported = False + return False except Exception as e: logger.error("Stream send/edit error: %s", e) + return False