"""Gateway streaming consumer — bridges sync agent callbacks to async platform delivery. The agent fires stream_delta_callback(text) synchronously from its worker thread. GatewayStreamConsumer: 1. Receives deltas via on_delta() (thread-safe, sync) 2. Queues them to an asyncio task via queue.Queue 3. The async run() task buffers, rate-limits, and progressively edits a single message on the target platform Design: Uses the edit transport (send initial message, then editMessageText). This is universally supported across Telegram, Discord, and Slack. Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697). """ from __future__ import annotations import asyncio import logging import queue import re import time from dataclasses import dataclass from typing import Any, Optional logger = logging.getLogger("gateway.stream_consumer") # Sentinel to signal the stream is complete _DONE = object() # Sentinel to signal a tool boundary — finalize current message and start a # new one so that subsequent text appears below tool progress messages. _NEW_SEGMENT = object() @dataclass class StreamConsumerConfig: """Runtime config for a single stream consumer instance.""" edit_interval: float = 0.3 buffer_threshold: int = 40 cursor: str = " ▉" class GatewayStreamConsumer: """Async consumer that progressively edits a platform message with streamed tokens. Usage:: consumer = GatewayStreamConsumer(adapter, chat_id, config, metadata=metadata) # Pass consumer.on_delta as stream_delta_callback to AIAgent agent = AIAgent(..., stream_delta_callback=consumer.on_delta) # Start the consumer as an asyncio task task = asyncio.create_task(consumer.run()) # ... run agent in thread pool ... consumer.finish() # signal completion await task # wait for final edit """ def __init__( self, adapter: Any, chat_id: str, config: Optional[StreamConsumerConfig] = None, metadata: Optional[dict] = None, ): self.adapter = adapter self.chat_id = chat_id self.cfg = config or StreamConsumerConfig() self.metadata = metadata self._queue: queue.Queue = queue.Queue() self._accumulated = "" self._message_id: Optional[str] = None self._already_sent = False self._edit_supported = True # Disabled on first edit failure (Signal/Email/HA) self._last_edit_time = 0.0 self._last_sent_text = "" # Track last-sent text to skip redundant edits self._fallback_final_send = False self._fallback_prefix = "" @property def already_sent(self) -> bool: """True if at least one message was sent/edited — signals the base adapter to skip re-sending the final response.""" return self._already_sent def on_delta(self, text: str) -> None: """Thread-safe callback — called from the agent's worker thread. When *text* is ``None``, signals a tool boundary: the current message is finalized and subsequent text will be sent as a new message so it appears below any tool-progress messages the gateway sent in between. """ if text: self._queue.put(text) elif text is None: self._queue.put(_NEW_SEGMENT) def finish(self) -> None: """Signal that the stream is complete.""" self._queue.put(_DONE) async def run(self) -> None: """Async task that drains the queue and edits the platform message.""" # Platform message length limit — leave room for cursor + formatting _raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096) _safe_limit = max(500, _raw_limit - len(self.cfg.cursor) - 100) try: while True: # Drain all available items from the queue got_done = False got_segment_break = False while True: try: item = self._queue.get_nowait() if item is _DONE: got_done = True break if item is _NEW_SEGMENT: got_segment_break = True break self._accumulated += item except queue.Empty: break # Decide whether to flush an edit now = time.monotonic() elapsed = now - self._last_edit_time should_edit = ( got_done or got_segment_break or (elapsed >= self.cfg.edit_interval and self._accumulated) or len(self._accumulated) >= self.cfg.buffer_threshold ) if should_edit and self._accumulated: # Split overflow: if accumulated text exceeds the platform # limit, split into properly sized chunks. if ( len(self._accumulated) > _safe_limit and self._message_id is None ): # No existing message to edit (first message or after a # segment break). Use truncate_message — the same # helper the non-streaming path uses — to split with # proper word/code-fence boundaries and chunk # indicators like "(1/2)". chunks = self.adapter.truncate_message( self._accumulated, _safe_limit ) for chunk in chunks: await self._send_new_chunk(chunk, self._message_id) self._accumulated = "" self._last_sent_text = "" self._last_edit_time = time.monotonic() if got_done: return if got_segment_break: self._message_id = None self._fallback_final_send = False self._fallback_prefix = "" continue # Existing message: edit it with the first chunk, then # start a new message for the overflow remainder. while ( len(self._accumulated) > _safe_limit and self._message_id is not None and self._edit_supported ): split_at = self._accumulated.rfind("\n", 0, _safe_limit) if split_at < _safe_limit // 2: split_at = _safe_limit chunk = self._accumulated[:split_at] await self._send_or_edit(chunk) if self._fallback_final_send: # Edit failed while attempting to split an oversized # message. Keep the full accumulated text intact so # the fallback final-send path can deliver the # remaining continuation without dropping content. break self._accumulated = self._accumulated[split_at:].lstrip("\n") self._message_id = None self._last_sent_text = "" display_text = self._accumulated if not got_done and not got_segment_break: display_text += self.cfg.cursor await self._send_or_edit(display_text) self._last_edit_time = time.monotonic() if got_done: # Final edit without cursor. If progressive editing failed # mid-stream, send a single continuation/fallback message # here instead of letting the base gateway path send the # full response again. if self._accumulated: if self._fallback_final_send: await self._send_fallback_final(self._accumulated) elif self._message_id: await self._send_or_edit(self._accumulated) elif not self._already_sent: await self._send_or_edit(self._accumulated) return # Tool boundary: reset message state so the next text chunk # creates a fresh message below any tool-progress messages. # # Exception: when _message_id is "__no_edit__" the platform # never returned a real message ID (e.g. Signal, webhook with # github_comment delivery). Resetting to None would re-enter # the "first send" path on every tool boundary and post one # platform message per tool call — that is what caused 155 # comments under a single PR. Instead, keep all state so the # full continuation is delivered once via _send_fallback_final. # (When editing fails mid-stream due to flood control the id is # a real string like "msg_1", not "__no_edit__", so that case # still resets and creates a fresh segment as intended.) if got_segment_break and self._message_id != "__no_edit__": self._message_id = None self._accumulated = "" self._last_sent_text = "" self._fallback_final_send = False self._fallback_prefix = "" await asyncio.sleep(0.05) # Small yield to not busy-loop except asyncio.CancelledError: # Best-effort final edit on cancellation if self._accumulated and self._message_id: try: await self._send_or_edit(self._accumulated) except Exception: pass except Exception as e: logger.error("Stream consumer error: %s", e) # Pattern to strip MEDIA: tags (including optional surrounding quotes). # Matches the simple cleanup regex used by the non-streaming path in # gateway/platforms/base.py for post-processing. _MEDIA_RE = re.compile(r'''[`"']?MEDIA:\s*\S+[`"']?''') @staticmethod def _clean_for_display(text: str) -> str: """Strip MEDIA: directives and internal markers from text before display. The streaming path delivers raw text chunks that may include ``MEDIA:`` tags and ``[[audio_as_voice]]`` directives meant for the platform adapter's post-processing. The actual media files are delivered separately via ``_deliver_media_from_response()`` after the stream finishes — we just need to hide the raw directives from the user. """ if "MEDIA:" not in text and "[[audio_as_voice]]" not in text: return text cleaned = text.replace("[[audio_as_voice]]", "") cleaned = GatewayStreamConsumer._MEDIA_RE.sub("", cleaned) # Collapse excessive blank lines left behind by removed tags cleaned = re.sub(r'\n{3,}', '\n\n', cleaned) # Strip trailing whitespace/newlines but preserve leading content return cleaned.rstrip() async def _send_new_chunk(self, text: str, reply_to_id: Optional[str]) -> Optional[str]: """Send a new message chunk, optionally threaded to a previous message. Returns the message_id so callers can thread subsequent chunks. """ text = self._clean_for_display(text) if not text.strip(): return reply_to_id try: meta = dict(self.metadata) if self.metadata else {} result = await self.adapter.send( chat_id=self.chat_id, content=text, reply_to=reply_to_id, metadata=meta, ) if result.success and result.message_id: self._message_id = str(result.message_id) self._already_sent = True self._last_sent_text = text return str(result.message_id) else: self._edit_supported = False return reply_to_id except Exception as e: logger.error("Stream send chunk error: %s", e) return reply_to_id def _visible_prefix(self) -> str: """Return the visible text already shown in the streamed message.""" prefix = self._last_sent_text or "" if self.cfg.cursor and prefix.endswith(self.cfg.cursor): prefix = prefix[:-len(self.cfg.cursor)] return self._clean_for_display(prefix) def _continuation_text(self, final_text: str) -> str: """Return only the part of final_text the user has not already seen.""" prefix = self._fallback_prefix or self._visible_prefix() if prefix and final_text.startswith(prefix): return final_text[len(prefix):].lstrip() return final_text @staticmethod def _split_text_chunks(text: str, limit: int) -> list[str]: """Split text into reasonably sized chunks for fallback sends.""" if len(text) <= limit: return [text] chunks: list[str] = [] remaining = text while len(remaining) > limit: split_at = remaining.rfind("\n", 0, limit) if split_at < limit // 2: split_at = limit chunks.append(remaining[:split_at]) remaining = remaining[split_at:].lstrip("\n") if remaining: chunks.append(remaining) return chunks async def _send_fallback_final(self, text: str) -> None: """Send the final continuation after streaming edits stop working.""" final_text = self._clean_for_display(text) continuation = self._continuation_text(final_text) self._fallback_final_send = False if not continuation.strip(): # Nothing new to send — the visible partial already matches final text. self._already_sent = True return raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096) safe_limit = max(500, raw_limit - 100) chunks = self._split_text_chunks(continuation, safe_limit) last_message_id: Optional[str] = None last_successful_chunk = "" sent_any_chunk = False for chunk in chunks: result = await self.adapter.send( chat_id=self.chat_id, content=chunk, metadata=self.metadata, ) if not result.success: if sent_any_chunk: # Some continuation text already reached the user. Suppress # the base gateway final-send path so we don't resend the # full response and create another duplicate. self._already_sent = True self._message_id = last_message_id self._last_sent_text = last_successful_chunk self._fallback_prefix = "" return # No fallback chunk reached the user — allow the normal gateway # final-send path to try one more time. self._already_sent = False self._message_id = None self._last_sent_text = "" self._fallback_prefix = "" return sent_any_chunk = True last_successful_chunk = chunk last_message_id = result.message_id or last_message_id self._message_id = last_message_id self._already_sent = True self._last_sent_text = chunks[-1] self._fallback_prefix = "" async def _send_or_edit(self, text: str) -> None: """Send or edit the streaming message.""" # Strip MEDIA: directives so they don't appear as visible text. # Media files are delivered as native attachments after the stream # finishes (via _deliver_media_from_response in gateway/run.py). text = self._clean_for_display(text) if not text.strip(): return try: if self._message_id is not None: if self._edit_supported: # Skip if text is identical to what we last sent if text == self._last_sent_text: return # Edit existing message result = await self.adapter.edit_message( chat_id=self.chat_id, message_id=self._message_id, content=text, ) if result.success: self._already_sent = True self._last_sent_text = text else: # If an edit fails mid-stream (especially Telegram flood control), # stop progressive edits and send only the missing tail once the # final response is available. logger.debug("Edit failed, disabling streaming for this adapter") self._fallback_prefix = self._visible_prefix() self._fallback_final_send = True self._edit_supported = False self._already_sent = True else: # Editing not supported — skip intermediate updates. # The final response will be sent by the fallback path. pass else: # First message — send new result = await self.adapter.send( chat_id=self.chat_id, content=text, metadata=self.metadata, ) if result.success and result.message_id: self._message_id = result.message_id self._already_sent = True self._last_sent_text = text elif result.success: # Platform accepted the message but returned no message_id # (e.g. Signal). Can't edit without an ID — switch to # fallback mode: suppress intermediate deltas, send only # the missing tail once the final response is ready. self._already_sent = True self._edit_supported = False self._fallback_prefix = self._clean_for_display(text) self._fallback_final_send = True # Sentinel prevents re-entering this branch on every delta self._message_id = "__no_edit__" else: # Initial send failed — disable streaming for this session self._edit_supported = False except Exception as e: logger.error("Stream send/edit error: %s", e)