mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
592 lines
26 KiB
Python
592 lines
26 KiB
Python
"""Gateway streaming consumer — bridges sync agent callbacks to async platform delivery.
|
|
|
|
The agent fires stream_delta_callback(text) synchronously from its worker thread.
|
|
GatewayStreamConsumer:
|
|
1. Receives deltas via on_delta() (thread-safe, sync)
|
|
2. Queues them to an asyncio task via queue.Queue
|
|
3. The async run() task buffers, rate-limits, and progressively edits
|
|
a single message on the target platform
|
|
|
|
Design: Uses the edit transport (send initial message, then editMessageText).
|
|
This is universally supported across Telegram, Discord, and Slack.
|
|
|
|
Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import queue
|
|
import re
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Any, Optional
|
|
|
|
logger = logging.getLogger("gateway.stream_consumer")
|
|
|
|
# Sentinel to signal the stream is complete
|
|
_DONE = object()
|
|
|
|
# Sentinel to signal a tool boundary — finalize current message and start a
|
|
# new one so that subsequent text appears below tool progress messages.
|
|
_NEW_SEGMENT = object()
|
|
|
|
# Queue marker for a completed assistant commentary message emitted between
|
|
# API/tool iterations (for example: "I'll inspect the repo first.").
|
|
_COMMENTARY = object()
|
|
|
|
|
|
@dataclass
|
|
class StreamConsumerConfig:
|
|
"""Runtime config for a single stream consumer instance."""
|
|
edit_interval: float = 1.0
|
|
buffer_threshold: int = 40
|
|
cursor: str = " ▉"
|
|
|
|
|
|
class GatewayStreamConsumer:
|
|
"""Async consumer that progressively edits a platform message with streamed tokens.
|
|
|
|
Usage::
|
|
|
|
consumer = GatewayStreamConsumer(adapter, chat_id, config, metadata=metadata)
|
|
# Pass consumer.on_delta as stream_delta_callback to AIAgent
|
|
agent = AIAgent(..., stream_delta_callback=consumer.on_delta)
|
|
# Start the consumer as an asyncio task
|
|
task = asyncio.create_task(consumer.run())
|
|
# ... run agent in thread pool ...
|
|
consumer.finish() # signal completion
|
|
await task # wait for final edit
|
|
"""
|
|
|
|
# After this many consecutive flood-control failures, permanently disable
|
|
# progressive edits for the remainder of the stream.
|
|
_MAX_FLOOD_STRIKES = 3
|
|
|
|
def __init__(
|
|
self,
|
|
adapter: Any,
|
|
chat_id: str,
|
|
config: Optional[StreamConsumerConfig] = None,
|
|
metadata: Optional[dict] = None,
|
|
):
|
|
self.adapter = adapter
|
|
self.chat_id = chat_id
|
|
self.cfg = config or StreamConsumerConfig()
|
|
self.metadata = metadata
|
|
self._queue: queue.Queue = queue.Queue()
|
|
self._accumulated = ""
|
|
self._message_id: Optional[str] = None
|
|
self._already_sent = False
|
|
self._edit_supported = True # Disabled when progressive edits are no longer usable
|
|
self._last_edit_time = 0.0
|
|
self._last_sent_text = "" # Track last-sent text to skip redundant edits
|
|
self._fallback_final_send = False
|
|
self._fallback_prefix = ""
|
|
self._flood_strikes = 0 # Consecutive flood-control edit failures
|
|
self._current_edit_interval = self.cfg.edit_interval # Adaptive backoff
|
|
self._final_response_sent = False
|
|
|
|
@property
|
|
def already_sent(self) -> bool:
|
|
"""True if at least one message was sent or edited during the run."""
|
|
return self._already_sent
|
|
|
|
@property
|
|
def final_response_sent(self) -> bool:
|
|
"""True when the stream consumer delivered the final assistant reply."""
|
|
return self._final_response_sent
|
|
|
|
def on_segment_break(self) -> None:
|
|
"""Finalize the current stream segment and start a fresh message."""
|
|
self._queue.put(_NEW_SEGMENT)
|
|
|
|
def on_commentary(self, text: str) -> None:
|
|
"""Queue a completed interim assistant commentary message."""
|
|
if text:
|
|
self._queue.put((_COMMENTARY, text))
|
|
|
|
def _reset_segment_state(self, *, preserve_no_edit: bool = False) -> None:
|
|
if preserve_no_edit and self._message_id == "__no_edit__":
|
|
return
|
|
self._message_id = None
|
|
self._accumulated = ""
|
|
self._last_sent_text = ""
|
|
self._fallback_final_send = False
|
|
self._fallback_prefix = ""
|
|
|
|
def on_delta(self, text: str) -> None:
|
|
"""Thread-safe callback — called from the agent's worker thread.
|
|
|
|
When *text* is ``None``, signals a tool boundary: the current message
|
|
is finalized and subsequent text will be sent as a new message so it
|
|
appears below any tool-progress messages the gateway sent in between.
|
|
"""
|
|
if text:
|
|
self._queue.put(text)
|
|
elif text is None:
|
|
self.on_segment_break()
|
|
|
|
def finish(self) -> None:
|
|
"""Signal that the stream is complete."""
|
|
self._queue.put(_DONE)
|
|
|
|
async def run(self) -> None:
|
|
"""Async task that drains the queue and edits the platform message."""
|
|
# Platform message length limit — leave room for cursor + formatting
|
|
_raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096)
|
|
_safe_limit = max(500, _raw_limit - len(self.cfg.cursor) - 100)
|
|
|
|
try:
|
|
while True:
|
|
# Drain all available items from the queue
|
|
got_done = False
|
|
got_segment_break = False
|
|
commentary_text = None
|
|
while True:
|
|
try:
|
|
item = self._queue.get_nowait()
|
|
if item is _DONE:
|
|
got_done = True
|
|
break
|
|
if item is _NEW_SEGMENT:
|
|
got_segment_break = True
|
|
break
|
|
if isinstance(item, tuple) and len(item) == 2 and item[0] is _COMMENTARY:
|
|
commentary_text = item[1]
|
|
break
|
|
self._accumulated += item
|
|
except queue.Empty:
|
|
break
|
|
|
|
# Decide whether to flush an edit
|
|
now = time.monotonic()
|
|
elapsed = now - self._last_edit_time
|
|
should_edit = (
|
|
got_done
|
|
or got_segment_break
|
|
or commentary_text is not None
|
|
or (elapsed >= self._current_edit_interval
|
|
and self._accumulated)
|
|
or len(self._accumulated) >= self.cfg.buffer_threshold
|
|
)
|
|
|
|
current_update_visible = False
|
|
if should_edit and self._accumulated:
|
|
# Split overflow: if accumulated text exceeds the platform
|
|
# limit, split into properly sized chunks.
|
|
if (
|
|
len(self._accumulated) > _safe_limit
|
|
and self._message_id is None
|
|
):
|
|
# No existing message to edit (first message or after a
|
|
# segment break). Use truncate_message — the same
|
|
# helper the non-streaming path uses — to split with
|
|
# proper word/code-fence boundaries and chunk
|
|
# indicators like "(1/2)".
|
|
chunks = self.adapter.truncate_message(
|
|
self._accumulated, _safe_limit
|
|
)
|
|
for chunk in chunks:
|
|
await self._send_new_chunk(chunk, self._message_id)
|
|
self._accumulated = ""
|
|
self._last_sent_text = ""
|
|
self._last_edit_time = time.monotonic()
|
|
if got_done:
|
|
self._final_response_sent = self._already_sent
|
|
return
|
|
if got_segment_break:
|
|
self._message_id = None
|
|
self._fallback_final_send = False
|
|
self._fallback_prefix = ""
|
|
continue
|
|
|
|
# Existing message: edit it with the first chunk, then
|
|
# start a new message for the overflow remainder.
|
|
while (
|
|
len(self._accumulated) > _safe_limit
|
|
and self._message_id is not None
|
|
and self._edit_supported
|
|
):
|
|
split_at = self._accumulated.rfind("\n", 0, _safe_limit)
|
|
if split_at < _safe_limit // 2:
|
|
split_at = _safe_limit
|
|
chunk = self._accumulated[:split_at]
|
|
ok = await self._send_or_edit(chunk)
|
|
if self._fallback_final_send or not ok:
|
|
# Edit failed (or backed off due to flood control)
|
|
# while attempting to split an oversized message.
|
|
# Keep the full accumulated text intact so the
|
|
# fallback final-send path can deliver the remaining
|
|
# continuation without dropping content.
|
|
break
|
|
self._accumulated = self._accumulated[split_at:].lstrip("\n")
|
|
self._message_id = None
|
|
self._last_sent_text = ""
|
|
|
|
display_text = self._accumulated
|
|
if not got_done and not got_segment_break and commentary_text is None:
|
|
display_text += self.cfg.cursor
|
|
|
|
current_update_visible = await self._send_or_edit(display_text)
|
|
self._last_edit_time = time.monotonic()
|
|
|
|
if got_done:
|
|
# Final edit without cursor. If progressive editing failed
|
|
# mid-stream, send a single continuation/fallback message
|
|
# here instead of letting the base gateway path send the
|
|
# full response again.
|
|
if self._accumulated:
|
|
if self._fallback_final_send:
|
|
await self._send_fallback_final(self._accumulated)
|
|
elif current_update_visible:
|
|
self._final_response_sent = True
|
|
elif self._message_id:
|
|
self._final_response_sent = await self._send_or_edit(self._accumulated)
|
|
elif not self._already_sent:
|
|
self._final_response_sent = await self._send_or_edit(self._accumulated)
|
|
return
|
|
|
|
if commentary_text is not None:
|
|
self._reset_segment_state()
|
|
await self._send_commentary(commentary_text)
|
|
self._last_edit_time = time.monotonic()
|
|
self._reset_segment_state()
|
|
|
|
# Tool boundary: reset message state so the next text chunk
|
|
# creates a fresh message below any tool-progress messages.
|
|
#
|
|
# Exception: when _message_id is "__no_edit__" the platform
|
|
# never returned a real message ID (e.g. Signal, webhook with
|
|
# github_comment delivery). Resetting to None would re-enter
|
|
# the "first send" path on every tool boundary and post one
|
|
# platform message per tool call — that is what caused 155
|
|
# comments under a single PR. Instead, preserve the sentinel
|
|
# so the full continuation is delivered once via
|
|
# _send_fallback_final.
|
|
# (When editing fails mid-stream due to flood control the id is
|
|
# a real string like "msg_1", not "__no_edit__", so that case
|
|
# still resets and creates a fresh segment as intended.)
|
|
if got_segment_break:
|
|
self._reset_segment_state(preserve_no_edit=True)
|
|
|
|
await asyncio.sleep(0.05) # Small yield to not busy-loop
|
|
|
|
except asyncio.CancelledError:
|
|
# Best-effort final edit on cancellation
|
|
if self._accumulated and self._message_id:
|
|
try:
|
|
await self._send_or_edit(self._accumulated)
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
logger.error("Stream consumer error: %s", e)
|
|
|
|
# Pattern to strip MEDIA:<path> tags (including optional surrounding quotes).
|
|
# Matches the simple cleanup regex used by the non-streaming path in
|
|
# gateway/platforms/base.py for post-processing.
|
|
_MEDIA_RE = re.compile(r'''[`"']?MEDIA:\s*\S+[`"']?''')
|
|
|
|
@staticmethod
|
|
def _clean_for_display(text: str) -> str:
|
|
"""Strip MEDIA: directives and internal markers from text before display.
|
|
|
|
The streaming path delivers raw text chunks that may include
|
|
``MEDIA:<path>`` tags and ``[[audio_as_voice]]`` directives meant for
|
|
the platform adapter's post-processing. The actual media files are
|
|
delivered separately via ``_deliver_media_from_response()`` after the
|
|
stream finishes — we just need to hide the raw directives from the
|
|
user.
|
|
"""
|
|
if "MEDIA:" not in text and "[[audio_as_voice]]" not in text:
|
|
return text
|
|
cleaned = text.replace("[[audio_as_voice]]", "")
|
|
cleaned = GatewayStreamConsumer._MEDIA_RE.sub("", cleaned)
|
|
# Collapse excessive blank lines left behind by removed tags
|
|
cleaned = re.sub(r'\n{3,}', '\n\n', cleaned)
|
|
# Strip trailing whitespace/newlines but preserve leading content
|
|
return cleaned.rstrip()
|
|
|
|
async def _send_new_chunk(self, text: str, reply_to_id: Optional[str]) -> Optional[str]:
|
|
"""Send a new message chunk, optionally threaded to a previous message.
|
|
|
|
Returns the message_id so callers can thread subsequent chunks.
|
|
"""
|
|
text = self._clean_for_display(text)
|
|
if not text.strip():
|
|
return reply_to_id
|
|
try:
|
|
meta = dict(self.metadata) if self.metadata else {}
|
|
result = await self.adapter.send(
|
|
chat_id=self.chat_id,
|
|
content=text,
|
|
reply_to=reply_to_id,
|
|
metadata=meta,
|
|
)
|
|
if result.success and result.message_id:
|
|
self._message_id = str(result.message_id)
|
|
self._already_sent = True
|
|
self._last_sent_text = text
|
|
return str(result.message_id)
|
|
else:
|
|
self._edit_supported = False
|
|
return reply_to_id
|
|
except Exception as e:
|
|
logger.error("Stream send chunk error: %s", e)
|
|
return reply_to_id
|
|
|
|
def _visible_prefix(self) -> str:
|
|
"""Return the visible text already shown in the streamed message."""
|
|
prefix = self._last_sent_text or ""
|
|
if self.cfg.cursor and prefix.endswith(self.cfg.cursor):
|
|
prefix = prefix[:-len(self.cfg.cursor)]
|
|
return self._clean_for_display(prefix)
|
|
|
|
def _continuation_text(self, final_text: str) -> str:
|
|
"""Return only the part of final_text the user has not already seen."""
|
|
prefix = self._fallback_prefix or self._visible_prefix()
|
|
if prefix and final_text.startswith(prefix):
|
|
return final_text[len(prefix):].lstrip()
|
|
return final_text
|
|
|
|
@staticmethod
|
|
def _split_text_chunks(text: str, limit: int) -> list[str]:
|
|
"""Split text into reasonably sized chunks for fallback sends."""
|
|
if len(text) <= limit:
|
|
return [text]
|
|
chunks: list[str] = []
|
|
remaining = text
|
|
while len(remaining) > limit:
|
|
split_at = remaining.rfind("\n", 0, limit)
|
|
if split_at < limit // 2:
|
|
split_at = limit
|
|
chunks.append(remaining[:split_at])
|
|
remaining = remaining[split_at:].lstrip("\n")
|
|
if remaining:
|
|
chunks.append(remaining)
|
|
return chunks
|
|
|
|
async def _send_fallback_final(self, text: str) -> None:
|
|
"""Send the final continuation after streaming edits stop working.
|
|
|
|
Retries each chunk once on flood-control failures with a short delay.
|
|
"""
|
|
final_text = self._clean_for_display(text)
|
|
continuation = self._continuation_text(final_text)
|
|
self._fallback_final_send = False
|
|
if not continuation.strip():
|
|
# Nothing new to send — the visible partial already matches final text.
|
|
self._already_sent = True
|
|
self._final_response_sent = True
|
|
return
|
|
|
|
raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096)
|
|
safe_limit = max(500, raw_limit - 100)
|
|
chunks = self._split_text_chunks(continuation, safe_limit)
|
|
|
|
last_message_id: Optional[str] = None
|
|
last_successful_chunk = ""
|
|
sent_any_chunk = False
|
|
for chunk in chunks:
|
|
# Try sending with one retry on flood-control errors.
|
|
result = None
|
|
for attempt in range(2):
|
|
result = await self.adapter.send(
|
|
chat_id=self.chat_id,
|
|
content=chunk,
|
|
metadata=self.metadata,
|
|
)
|
|
if result.success:
|
|
break
|
|
if attempt == 0 and self._is_flood_error(result):
|
|
logger.debug(
|
|
"Flood control on fallback send, retrying in 3s"
|
|
)
|
|
await asyncio.sleep(3.0)
|
|
else:
|
|
break # non-flood error or second attempt failed
|
|
|
|
if not result or not result.success:
|
|
if sent_any_chunk:
|
|
# Some continuation text already reached the user. Suppress
|
|
# the base gateway final-send path so we don't resend the
|
|
# full response and create another duplicate.
|
|
self._already_sent = True
|
|
self._final_response_sent = True
|
|
self._message_id = last_message_id
|
|
self._last_sent_text = last_successful_chunk
|
|
self._fallback_prefix = ""
|
|
return
|
|
# No fallback chunk reached the user — allow the normal gateway
|
|
# final-send path to try one more time.
|
|
self._already_sent = False
|
|
self._message_id = None
|
|
self._last_sent_text = ""
|
|
self._fallback_prefix = ""
|
|
return
|
|
sent_any_chunk = True
|
|
last_successful_chunk = chunk
|
|
last_message_id = result.message_id or last_message_id
|
|
|
|
self._message_id = last_message_id
|
|
self._already_sent = True
|
|
self._final_response_sent = True
|
|
self._last_sent_text = chunks[-1]
|
|
self._fallback_prefix = ""
|
|
|
|
def _is_flood_error(self, result) -> bool:
|
|
"""Check if a SendResult failure is due to flood control / rate limiting."""
|
|
err = getattr(result, "error", "") or ""
|
|
err_lower = err.lower()
|
|
return "flood" in err_lower or "retry after" in err_lower or "rate" in err_lower
|
|
|
|
async def _try_strip_cursor(self) -> None:
|
|
"""Best-effort edit to remove the cursor from the last visible message.
|
|
|
|
Called when entering fallback mode so the user doesn't see a stuck
|
|
cursor (▉) in the partial message.
|
|
"""
|
|
if not self._message_id or self._message_id == "__no_edit__":
|
|
return
|
|
prefix = self._visible_prefix()
|
|
if not prefix or not prefix.strip():
|
|
return
|
|
try:
|
|
await self.adapter.edit_message(
|
|
chat_id=self.chat_id,
|
|
message_id=self._message_id,
|
|
content=prefix,
|
|
)
|
|
self._last_sent_text = prefix
|
|
except Exception:
|
|
pass # best-effort — don't let this block the fallback path
|
|
|
|
async def _send_commentary(self, text: str) -> bool:
|
|
"""Send a completed interim assistant commentary message."""
|
|
text = self._clean_for_display(text)
|
|
if not text.strip():
|
|
return False
|
|
try:
|
|
result = await self.adapter.send(
|
|
chat_id=self.chat_id,
|
|
content=text,
|
|
metadata=self.metadata,
|
|
)
|
|
if result.success:
|
|
self._already_sent = True
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Commentary send error: %s", e)
|
|
return False
|
|
|
|
async def _send_or_edit(self, text: str) -> bool:
|
|
"""Send or edit the streaming message.
|
|
|
|
Returns True if the text was successfully delivered (sent or edited),
|
|
False otherwise. Callers like the overflow split loop use this to
|
|
decide whether to advance past the delivered chunk.
|
|
"""
|
|
# Strip MEDIA: directives so they don't appear as visible text.
|
|
# Media files are delivered as native attachments after the stream
|
|
# finishes (via _deliver_media_from_response in gateway/run.py).
|
|
text = self._clean_for_display(text)
|
|
# A bare streaming cursor is not meaningful user-visible content and
|
|
# can render as a stray tofu/white-box message on some clients.
|
|
visible_without_cursor = text
|
|
if self.cfg.cursor:
|
|
visible_without_cursor = visible_without_cursor.replace(self.cfg.cursor, "")
|
|
if not visible_without_cursor.strip():
|
|
return True # cursor-only / whitespace-only update
|
|
if not text.strip():
|
|
return True # nothing to send is "success"
|
|
try:
|
|
if self._message_id is not None:
|
|
if self._edit_supported:
|
|
# Skip if text is identical to what we last sent
|
|
if text == self._last_sent_text:
|
|
return True
|
|
# Edit existing message
|
|
result = await self.adapter.edit_message(
|
|
chat_id=self.chat_id,
|
|
message_id=self._message_id,
|
|
content=text,
|
|
)
|
|
if result.success:
|
|
self._already_sent = True
|
|
self._last_sent_text = text
|
|
# Successful edit — reset flood strike counter
|
|
self._flood_strikes = 0
|
|
return True
|
|
else:
|
|
# Edit failed. If this looks like flood control / rate
|
|
# limiting, use adaptive backoff: double the edit interval
|
|
# and retry on the next cycle. Only permanently disable
|
|
# edits after _MAX_FLOOD_STRIKES consecutive failures.
|
|
if self._is_flood_error(result):
|
|
self._flood_strikes += 1
|
|
self._current_edit_interval = min(
|
|
self._current_edit_interval * 2, 10.0,
|
|
)
|
|
logger.debug(
|
|
"Flood control on edit (strike %d/%d), "
|
|
"backoff interval → %.1fs",
|
|
self._flood_strikes,
|
|
self._MAX_FLOOD_STRIKES,
|
|
self._current_edit_interval,
|
|
)
|
|
if self._flood_strikes < self._MAX_FLOOD_STRIKES:
|
|
# Don't disable edits yet — just slow down.
|
|
# Update _last_edit_time so the next edit
|
|
# respects the new interval.
|
|
self._last_edit_time = time.monotonic()
|
|
return False
|
|
|
|
# Non-flood error OR flood strikes exhausted: enter
|
|
# fallback mode — send only the missing tail once the
|
|
# final response is available.
|
|
logger.debug(
|
|
"Edit failed (strikes=%d), entering fallback mode",
|
|
self._flood_strikes,
|
|
)
|
|
self._fallback_prefix = self._visible_prefix()
|
|
self._fallback_final_send = True
|
|
self._edit_supported = False
|
|
self._already_sent = True
|
|
# Best-effort: strip the cursor from the last visible
|
|
# message so the user doesn't see a stuck ▉.
|
|
await self._try_strip_cursor()
|
|
return False
|
|
else:
|
|
# Editing not supported — skip intermediate updates.
|
|
# The final response will be sent by the fallback path.
|
|
return False
|
|
else:
|
|
# First message — send new
|
|
result = await self.adapter.send(
|
|
chat_id=self.chat_id,
|
|
content=text,
|
|
metadata=self.metadata,
|
|
)
|
|
if result.success:
|
|
if result.message_id:
|
|
self._message_id = result.message_id
|
|
else:
|
|
self._edit_supported = False
|
|
self._already_sent = True
|
|
self._last_sent_text = text
|
|
if not result.message_id:
|
|
self._fallback_prefix = self._visible_prefix()
|
|
self._fallback_final_send = True
|
|
# Sentinel prevents re-entering the first-send path on
|
|
# every delta/tool boundary when platforms accept a
|
|
# message but do not return an editable message id.
|
|
self._message_id = "__no_edit__"
|
|
return True
|
|
else:
|
|
# Initial send failed — disable streaming for this session
|
|
self._edit_supported = False
|
|
return False
|
|
except Exception as e:
|
|
logger.error("Stream send/edit error: %s", e)
|
|
return False
|