diff --git a/gateway/config.py b/gateway/config.py index af399f0f7b..676b521401 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -146,6 +146,37 @@ class PlatformConfig: ) +@dataclass +class StreamingConfig: + """Configuration for real-time token streaming to messaging platforms.""" + enabled: bool = True + transport: str = "edit" # "edit" (progressive editMessageText) or "off" + edit_interval: float = 0.3 # Seconds between message edits + buffer_threshold: int = 40 # Chars before forcing an edit + cursor: str = " ▉" # Cursor shown during streaming + + def to_dict(self) -> Dict[str, Any]: + return { + "enabled": self.enabled, + "transport": self.transport, + "edit_interval": self.edit_interval, + "buffer_threshold": self.buffer_threshold, + "cursor": self.cursor, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "StreamingConfig": + if not data: + return cls() + return cls( + enabled=data.get("enabled", True), + transport=data.get("transport", "edit"), + edit_interval=float(data.get("edit_interval", 0.3)), + buffer_threshold=int(data.get("buffer_threshold", 40)), + cursor=data.get("cursor", " ▉"), + ) + + @dataclass class GatewayConfig: """ @@ -179,6 +210,9 @@ class GatewayConfig: # Session isolation in shared chats group_sessions_per_user: bool = True # Isolate group/channel sessions per participant when user IDs are available + # Streaming configuration + streaming: StreamingConfig = field(default_factory=StreamingConfig) + def get_connected_platforms(self) -> List[Platform]: """Return list of platforms that are enabled and configured.""" connected = [] @@ -244,6 +278,7 @@ class GatewayConfig: "always_log_local": self.always_log_local, "stt_enabled": self.stt_enabled, "group_sessions_per_user": self.group_sessions_per_user, + "streaming": self.streaming.to_dict(), } @classmethod @@ -297,6 +332,7 @@ class GatewayConfig: always_log_local=data.get("always_log_local", True), stt_enabled=_coerce_bool(stt_enabled, True), group_sessions_per_user=_coerce_bool(group_sessions_per_user, True), + streaming=StreamingConfig.from_dict(data.get("streaming", {})), ) diff --git a/gateway/run.py b/gateway/run.py index a7e637ec69..8bc860c3a0 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1915,6 +1915,11 @@ class GatewayRunner: if self._should_send_voice_reply(event, response, agent_messages): await self._send_voice_reply(event, response) + # If streaming already delivered the response, return None so + # _process_message_background doesn't send it again. + if agent_result.get("already_sent"): + return None + return response except Exception as e: @@ -4080,6 +4085,7 @@ class GatewayRunner: agent_holder = [None] # Mutable container for the agent instance result_holder = [None] # Mutable container for the result tools_holder = [None] # Mutable container for the tool definitions + stream_consumer_holder = [None] # Mutable container for stream consumer # Bridge sync step_callback → async hooks.emit for agent:step events _loop_for_step = asyncio.get_event_loop() @@ -4142,6 +4148,35 @@ class GatewayRunner: honcho_manager, honcho_config = self._get_or_create_gateway_honcho(session_key) reasoning_config = self._load_reasoning_config() self._reasoning_config = reasoning_config + # Set up streaming consumer if enabled + _stream_consumer = None + _stream_delta_cb = None + _scfg = getattr(getattr(self, 'config', None), 'streaming', None) + if _scfg is None: + from gateway.config import StreamingConfig + _scfg = StreamingConfig() + + if _scfg.enabled and _scfg.transport != "off": + try: + from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig + _adapter = self.adapters.get(source.platform) + if _adapter: + _consumer_cfg = StreamConsumerConfig( + edit_interval=_scfg.edit_interval, + buffer_threshold=_scfg.buffer_threshold, + cursor=_scfg.cursor, + ) + _stream_consumer = GatewayStreamConsumer( + adapter=_adapter, + chat_id=source.chat_id, + config=_consumer_cfg, + metadata={"thread_id": source.thread_id} if source.thread_id else None, + ) + _stream_delta_cb = _stream_consumer.on_delta + stream_consumer_holder[0] = _stream_consumer + except Exception as _sc_err: + logger.debug("Could not set up stream consumer: %s", _sc_err) + agent = AIAgent( model=model, **runtime_kwargs, @@ -4161,6 +4196,7 @@ class GatewayRunner: session_id=session_id, tool_progress_callback=progress_callback if tool_progress_enabled else None, step_callback=_step_callback_sync if _hooks_ref.loaded_hooks else None, + stream_delta_callback=_stream_delta_cb, platform=platform_key, honcho_session_key=session_key, honcho_manager=honcho_manager, @@ -4231,6 +4267,10 @@ class GatewayRunner: result = agent.run_conversation(message, conversation_history=agent_history, task_id=session_id) result_holder[0] = result + + # Signal the stream consumer that the agent is done + if _stream_consumer is not None: + _stream_consumer.finish() # Return final response, or a message if something went wrong final_response = result.get("final_response") @@ -4330,6 +4370,11 @@ class GatewayRunner: progress_task = None if tool_progress_enabled: progress_task = asyncio.create_task(send_progress_messages()) + + # Start stream consumer task if configured + stream_task = None + if stream_consumer_holder[0] is not None: + stream_task = asyncio.create_task(stream_consumer_holder[0].run()) # Track this agent as running for this session (for interrupt support) # We do this in a callback after the agent is created @@ -4412,6 +4457,17 @@ class GatewayRunner: if progress_task: progress_task.cancel() interrupt_monitor.cancel() + + # Wait for stream consumer to finish its final edit + if stream_task: + try: + await asyncio.wait_for(stream_task, timeout=5.0) + except (asyncio.TimeoutError, asyncio.CancelledError): + stream_task.cancel() + try: + await stream_task + except asyncio.CancelledError: + pass # Clean up tracking tracking_task.cancel() @@ -4425,6 +4481,12 @@ class GatewayRunner: await task except asyncio.CancelledError: pass + + # If streaming already delivered the response, mark it so the + # caller's send() is skipped (avoiding duplicate messages). + _sc = stream_consumer_holder[0] + if _sc and _sc.already_sent and isinstance(response, dict): + response["already_sent"] = True return response diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py new file mode 100644 index 0000000000..f6ab004c7c --- /dev/null +++ b/gateway/stream_consumer.py @@ -0,0 +1,172 @@ +"""Gateway streaming consumer — bridges sync agent callbacks to async platform delivery. + +The agent fires stream_delta_callback(text) synchronously from its worker thread. +GatewayStreamConsumer: + 1. Receives deltas via on_delta() (thread-safe, sync) + 2. Queues them to an asyncio task via queue.Queue + 3. The async run() task buffers, rate-limits, and progressively edits + a single message on the target platform + +Design: Uses the edit transport (send initial message, then editMessageText). +This is universally supported across Telegram, Discord, and Slack. + +Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697). +""" + +from __future__ import annotations + +import asyncio +import logging +import queue +import time +from dataclasses import dataclass +from typing import Any, Optional + +logger = logging.getLogger("gateway.stream_consumer") + +# Sentinel to signal the stream is complete +_DONE = object() + + +@dataclass +class StreamConsumerConfig: + """Runtime config for a single stream consumer instance.""" + edit_interval: float = 0.3 + buffer_threshold: int = 40 + cursor: str = " ▉" + + +class GatewayStreamConsumer: + """Async consumer that progressively edits a platform message with streamed tokens. + + Usage:: + + consumer = GatewayStreamConsumer(adapter, chat_id, config, metadata=metadata) + # Pass consumer.on_delta as stream_delta_callback to AIAgent + agent = AIAgent(..., stream_delta_callback=consumer.on_delta) + # Start the consumer as an asyncio task + task = asyncio.create_task(consumer.run()) + # ... run agent in thread pool ... + consumer.finish() # signal completion + await task # wait for final edit + """ + + def __init__( + self, + adapter: Any, + chat_id: str, + config: Optional[StreamConsumerConfig] = None, + metadata: Optional[dict] = None, + ): + self.adapter = adapter + self.chat_id = chat_id + self.cfg = config or StreamConsumerConfig() + self.metadata = metadata + self._queue: queue.Queue = queue.Queue() + self._accumulated = "" + self._message_id: Optional[str] = None + self._already_sent = False + self._last_edit_time = 0.0 + + @property + def already_sent(self) -> bool: + """True if at least one message was sent/edited — signals the base + adapter to skip re-sending the final response.""" + return self._already_sent + + def on_delta(self, text: str) -> None: + """Thread-safe callback — called from the agent's worker thread.""" + if text: + self._queue.put(text) + + def finish(self) -> None: + """Signal that the stream is complete.""" + self._queue.put(_DONE) + + async def run(self) -> None: + """Async task that drains the queue and edits the platform message.""" + try: + while True: + # Drain all available items from the queue + got_done = False + while True: + try: + item = self._queue.get_nowait() + if item is _DONE: + got_done = True + break + self._accumulated += item + except queue.Empty: + break + + # Decide whether to flush an edit + now = time.monotonic() + elapsed = now - self._last_edit_time + should_edit = ( + got_done + or (elapsed >= self.cfg.edit_interval + and len(self._accumulated) > 0) + or len(self._accumulated) >= self.cfg.buffer_threshold + ) + + if should_edit and self._accumulated: + display_text = self._accumulated + if not got_done: + display_text += self.cfg.cursor + + await self._send_or_edit(display_text) + self._last_edit_time = time.monotonic() + + if got_done: + # Final edit without cursor + if self._accumulated and self._message_id: + await self._send_or_edit(self._accumulated) + return + + await asyncio.sleep(0.05) # Small yield to not busy-loop + + except asyncio.CancelledError: + # Best-effort final edit on cancellation + if self._accumulated and self._message_id: + try: + await self._send_or_edit(self._accumulated) + except Exception: + pass + except Exception as e: + logger.error("Stream consumer error: %s", e) + + async def _send_or_edit(self, text: str) -> None: + """Send or edit the streaming message.""" + try: + if self._message_id is not None: + # Edit existing message + result = await self.adapter.edit_message( + chat_id=self.chat_id, + message_id=self._message_id, + content=text, + ) + if result.success: + self._already_sent = True + else: + # Edit failed — try sending as new message + logger.debug("Edit failed, sending new message") + result = await self.adapter.send( + chat_id=self.chat_id, + content=text, + metadata=self.metadata, + ) + if result.success and result.message_id: + self._message_id = result.message_id + self._already_sent = True + else: + # First message — send new + result = await self.adapter.send( + chat_id=self.chat_id, + content=text, + metadata=self.metadata, + ) + if result.success and result.message_id: + self._message_id = result.message_id + self._already_sent = True + except Exception as e: + logger.error("Stream send/edit error: %s", e)