fix(streaming): adaptive backoff + cursor strip to prevent message truncation

Telegram flood control during streaming caused messages to be cut off
mid-response. The old behavior permanently disabled edits after a single
flood-control failure, losing the remainder of the response.

Changes:
- Adaptive backoff: on flood-control edit failures, double the edit interval
  instead of immediately disabling edits. Only permanently disable after 3
  consecutive failures (_MAX_FLOOD_STRIKES).
- Cursor strip: when entering fallback mode, best-effort edit to remove the
  cursor (▉) from the last visible message so it doesn't appear stuck.
- Fallback send retry: _send_fallback_final retries each chunk once on
  flood-control failures (3s delay) before giving up.
- Default edit_interval increased from 0.3s to 1.0s. Telegram rate-limits
  edits at ~1/s per message; 0.3s was virtually guaranteed to trigger flood
  control on any non-trivial response.
- _send_or_edit returns bool so the overflow split loop knows not to
  truncate accumulated text when an edit fails (prevents content loss).

Fixes: messages cutting/stopping mid-response on Telegram, especially
with streaming enabled.
This commit is contained in:
Teknium 2026-04-11 03:50:33 -07:00
parent af9caec44f
commit ad85285915
No known key found for this signature in database
2 changed files with 117 additions and 25 deletions

View file

@ -190,7 +190,7 @@ class StreamingConfig:
"""Configuration for real-time token streaming to messaging platforms."""
enabled: bool = False
transport: str = "edit" # "edit" (progressive editMessageText) or "off"
edit_interval: float = 0.3 # Seconds between message edits
edit_interval: float = 1.0 # Seconds between message edits (Telegram rate-limits at ~1/s)
buffer_threshold: int = 40 # Chars before forcing an edit
cursor: str = "" # Cursor shown during streaming
@ -210,7 +210,7 @@ class StreamingConfig:
return cls(
enabled=data.get("enabled", False),
transport=data.get("transport", "edit"),
edit_interval=float(data.get("edit_interval", 0.3)),
edit_interval=float(data.get("edit_interval", 1.0)),
buffer_threshold=int(data.get("buffer_threshold", 40)),
cursor=data.get("cursor", ""),
)

View file

@ -36,7 +36,7 @@ _NEW_SEGMENT = object()
@dataclass
class StreamConsumerConfig:
"""Runtime config for a single stream consumer instance."""
edit_interval: float = 0.3
edit_interval: float = 1.0
buffer_threshold: int = 40
cursor: str = ""
@ -56,6 +56,10 @@ class GatewayStreamConsumer:
await task # wait for final edit
"""
# After this many consecutive flood-control failures, permanently disable
# progressive edits for the remainder of the stream.
_MAX_FLOOD_STRIKES = 3
def __init__(
self,
adapter: Any,
@ -76,6 +80,8 @@ class GatewayStreamConsumer:
self._last_sent_text = "" # Track last-sent text to skip redundant edits
self._fallback_final_send = False
self._fallback_prefix = ""
self._flood_strikes = 0 # Consecutive flood-control edit failures
self._current_edit_interval = self.cfg.edit_interval # Adaptive backoff
@property
def already_sent(self) -> bool:
@ -129,7 +135,7 @@ class GatewayStreamConsumer:
should_edit = (
got_done
or got_segment_break
or (elapsed >= self.cfg.edit_interval
or (elapsed >= self._current_edit_interval
and self._accumulated)
or len(self._accumulated) >= self.cfg.buffer_threshold
)
@ -173,12 +179,13 @@ class GatewayStreamConsumer:
if split_at < _safe_limit // 2:
split_at = _safe_limit
chunk = self._accumulated[:split_at]
await self._send_or_edit(chunk)
if self._fallback_final_send:
# Edit failed while attempting to split an oversized
# message. Keep the full accumulated text intact so
# the fallback final-send path can deliver the
# remaining continuation without dropping content.
ok = await self._send_or_edit(chunk)
if self._fallback_final_send or not ok:
# Edit failed (or backed off due to flood control)
# while attempting to split an oversized message.
# Keep the full accumulated text intact so the
# fallback final-send path can deliver the remaining
# continuation without dropping content.
break
self._accumulated = self._accumulated[split_at:].lstrip("\n")
self._message_id = None
@ -322,7 +329,10 @@ class GatewayStreamConsumer:
return chunks
async def _send_fallback_final(self, text: str) -> None:
"""Send the final continuation after streaming edits stop working."""
"""Send the final continuation after streaming edits stop working.
Retries each chunk once on flood-control failures with a short delay.
"""
final_text = self._clean_for_display(text)
continuation = self._continuation_text(final_text)
self._fallback_final_send = False
@ -339,12 +349,25 @@ class GatewayStreamConsumer:
last_successful_chunk = ""
sent_any_chunk = False
for chunk in chunks:
result = await self.adapter.send(
chat_id=self.chat_id,
content=chunk,
metadata=self.metadata,
)
if not result.success:
# Try sending with one retry on flood-control errors.
result = None
for attempt in range(2):
result = await self.adapter.send(
chat_id=self.chat_id,
content=chunk,
metadata=self.metadata,
)
if result.success:
break
if attempt == 0 and self._is_flood_error(result):
logger.debug(
"Flood control on fallback send, retrying in 3s"
)
await asyncio.sleep(3.0)
else:
break # non-flood error or second attempt failed
if not result or not result.success:
if sent_any_chunk:
# Some continuation text already reached the user. Suppress
# the base gateway final-send path so we don't resend the
@ -370,20 +393,52 @@ class GatewayStreamConsumer:
self._last_sent_text = chunks[-1]
self._fallback_prefix = ""
async def _send_or_edit(self, text: str) -> None:
"""Send or edit the streaming message."""
def _is_flood_error(self, result) -> bool:
"""Check if a SendResult failure is due to flood control / rate limiting."""
err = getattr(result, "error", "") or ""
err_lower = err.lower()
return "flood" in err_lower or "retry after" in err_lower or "rate" in err_lower
async def _try_strip_cursor(self) -> None:
"""Best-effort edit to remove the cursor from the last visible message.
Called when entering fallback mode so the user doesn't see a stuck
cursor () in the partial message.
"""
if not self._message_id or self._message_id == "__no_edit__":
return
prefix = self._visible_prefix()
if not prefix or not prefix.strip():
return
try:
await self.adapter.edit_message(
chat_id=self.chat_id,
message_id=self._message_id,
content=prefix,
)
self._last_sent_text = prefix
except Exception:
pass # best-effort — don't let this block the fallback path
async def _send_or_edit(self, text: str) -> bool:
"""Send or edit the streaming message.
Returns True if the text was successfully delivered (sent or edited),
False otherwise. Callers like the overflow split loop use this to
decide whether to advance past the delivered chunk.
"""
# Strip MEDIA: directives so they don't appear as visible text.
# Media files are delivered as native attachments after the stream
# finishes (via _deliver_media_from_response in gateway/run.py).
text = self._clean_for_display(text)
if not text.strip():
return
return True # nothing to send is "success"
try:
if self._message_id is not None:
if self._edit_supported:
# Skip if text is identical to what we last sent
if text == self._last_sent_text:
return
return True
# Edit existing message
result = await self.adapter.edit_message(
chat_id=self.chat_id,
@ -393,19 +448,52 @@ class GatewayStreamConsumer:
if result.success:
self._already_sent = True
self._last_sent_text = text
# Successful edit — reset flood strike counter
self._flood_strikes = 0
return True
else:
# If an edit fails mid-stream (especially Telegram flood control),
# stop progressive edits and send only the missing tail once the
# Edit failed. If this looks like flood control / rate
# limiting, use adaptive backoff: double the edit interval
# and retry on the next cycle. Only permanently disable
# edits after _MAX_FLOOD_STRIKES consecutive failures.
if self._is_flood_error(result):
self._flood_strikes += 1
self._current_edit_interval = min(
self._current_edit_interval * 2, 10.0,
)
logger.debug(
"Flood control on edit (strike %d/%d), "
"backoff interval → %.1fs",
self._flood_strikes,
self._MAX_FLOOD_STRIKES,
self._current_edit_interval,
)
if self._flood_strikes < self._MAX_FLOOD_STRIKES:
# Don't disable edits yet — just slow down.
# Update _last_edit_time so the next edit
# respects the new interval.
self._last_edit_time = time.monotonic()
return False
# Non-flood error OR flood strikes exhausted: enter
# fallback mode — send only the missing tail once the
# final response is available.
logger.debug("Edit failed, disabling streaming for this adapter")
logger.debug(
"Edit failed (strikes=%d), entering fallback mode",
self._flood_strikes,
)
self._fallback_prefix = self._visible_prefix()
self._fallback_final_send = True
self._edit_supported = False
self._already_sent = True
# Best-effort: strip the cursor from the last visible
# message so the user doesn't see a stuck ▉.
await self._try_strip_cursor()
return False
else:
# Editing not supported — skip intermediate updates.
# The final response will be sent by the fallback path.
pass
return False
else:
# First message — send new
result = await self.adapter.send(
@ -417,6 +505,7 @@ class GatewayStreamConsumer:
self._message_id = result.message_id
self._already_sent = True
self._last_sent_text = text
return True
elif result.success:
# Platform accepted the message but returned no message_id
# (e.g. Signal). Can't edit without an ID — switch to
@ -428,8 +517,11 @@ class GatewayStreamConsumer:
self._fallback_final_send = True
# Sentinel prevents re-entering this branch on every delta
self._message_id = "__no_edit__"
return True # platform accepted, just can't edit
else:
# Initial send failed — disable streaming for this session
self._edit_supported = False
return False
except Exception as e:
logger.error("Stream send/edit error: %s", e)
return False