mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
Add display.interim_assistant_messages config (enabled by default) that forwards completed assistant commentary between tool calls to the user as separate chat messages. Models already emit useful status text like 'I'll inspect the repo first.' — this surfaces it on Telegram, Discord, and other messaging platforms instead of swallowing it. Independent from tool_progress and gateway streaming. Disabled for webhooks. Uses GatewayStreamConsumer when available, falls back to direct adapter send. Tracks response_previewed to prevent double-delivery when interim message matches the final response. Also fixes: cursor not stripped from fallback prefix in stream consumer (affected continuation calculation on no-edit platforms like Signal). Cherry-picked from PR #7885 by asheriif, default changed to enabled. Fixes #5016
585 lines
26 KiB
Python
585 lines
26 KiB
Python
"""Gateway streaming consumer — bridges sync agent callbacks to async platform delivery.
|
|
|
|
The agent fires stream_delta_callback(text) synchronously from its worker thread.
|
|
GatewayStreamConsumer:
|
|
1. Receives deltas via on_delta() (thread-safe, sync)
|
|
2. Queues them to an asyncio task via queue.Queue
|
|
3. The async run() task buffers, rate-limits, and progressively edits
|
|
a single message on the target platform
|
|
|
|
Design: Uses the edit transport (send initial message, then editMessageText).
|
|
This is universally supported across Telegram, Discord, and Slack.
|
|
|
|
Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import queue
|
|
import re
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Any, Optional
|
|
|
|
logger = logging.getLogger("gateway.stream_consumer")
|
|
|
|
# Sentinel to signal the stream is complete
|
|
_DONE = object()
|
|
|
|
# Sentinel to signal a tool boundary — finalize current message and start a
|
|
# new one so that subsequent text appears below tool progress messages.
|
|
_NEW_SEGMENT = object()
|
|
|
|
# Queue marker for a completed assistant commentary message emitted between
|
|
# API/tool iterations (for example: "I'll inspect the repo first.").
|
|
_COMMENTARY = object()
|
|
|
|
|
|
@dataclass
|
|
class StreamConsumerConfig:
|
|
"""Runtime config for a single stream consumer instance."""
|
|
edit_interval: float = 1.0
|
|
buffer_threshold: int = 40
|
|
cursor: str = " ▉"
|
|
|
|
|
|
class GatewayStreamConsumer:
|
|
"""Async consumer that progressively edits a platform message with streamed tokens.
|
|
|
|
Usage::
|
|
|
|
consumer = GatewayStreamConsumer(adapter, chat_id, config, metadata=metadata)
|
|
# Pass consumer.on_delta as stream_delta_callback to AIAgent
|
|
agent = AIAgent(..., stream_delta_callback=consumer.on_delta)
|
|
# Start the consumer as an asyncio task
|
|
task = asyncio.create_task(consumer.run())
|
|
# ... run agent in thread pool ...
|
|
consumer.finish() # signal completion
|
|
await task # wait for final edit
|
|
"""
|
|
|
|
# After this many consecutive flood-control failures, permanently disable
|
|
# progressive edits for the remainder of the stream.
|
|
_MAX_FLOOD_STRIKES = 3
|
|
|
|
def __init__(
|
|
self,
|
|
adapter: Any,
|
|
chat_id: str,
|
|
config: Optional[StreamConsumerConfig] = None,
|
|
metadata: Optional[dict] = None,
|
|
):
|
|
self.adapter = adapter
|
|
self.chat_id = chat_id
|
|
self.cfg = config or StreamConsumerConfig()
|
|
self.metadata = metadata
|
|
self._queue: queue.Queue = queue.Queue()
|
|
self._accumulated = ""
|
|
self._message_id: Optional[str] = None
|
|
self._already_sent = False
|
|
self._edit_supported = True # Disabled when progressive edits are no longer usable
|
|
self._last_edit_time = 0.0
|
|
self._last_sent_text = "" # Track last-sent text to skip redundant edits
|
|
self._fallback_final_send = False
|
|
self._fallback_prefix = ""
|
|
self._flood_strikes = 0 # Consecutive flood-control edit failures
|
|
self._current_edit_interval = self.cfg.edit_interval # Adaptive backoff
|
|
self._final_response_sent = False
|
|
|
|
@property
|
|
def already_sent(self) -> bool:
|
|
"""True if at least one message was sent or edited during the run."""
|
|
return self._already_sent
|
|
|
|
@property
|
|
def final_response_sent(self) -> bool:
|
|
"""True when the stream consumer delivered the final assistant reply."""
|
|
return self._final_response_sent
|
|
|
|
def on_segment_break(self) -> None:
|
|
"""Finalize the current stream segment and start a fresh message."""
|
|
self._queue.put(_NEW_SEGMENT)
|
|
|
|
def on_commentary(self, text: str) -> None:
|
|
"""Queue a completed interim assistant commentary message."""
|
|
if text:
|
|
self._queue.put((_COMMENTARY, text))
|
|
|
|
def _reset_segment_state(self, *, preserve_no_edit: bool = False) -> None:
|
|
if preserve_no_edit and self._message_id == "__no_edit__":
|
|
return
|
|
self._message_id = None
|
|
self._accumulated = ""
|
|
self._last_sent_text = ""
|
|
self._fallback_final_send = False
|
|
self._fallback_prefix = ""
|
|
|
|
def on_delta(self, text: str) -> None:
|
|
"""Thread-safe callback — called from the agent's worker thread.
|
|
|
|
When *text* is ``None``, signals a tool boundary: the current message
|
|
is finalized and subsequent text will be sent as a new message so it
|
|
appears below any tool-progress messages the gateway sent in between.
|
|
"""
|
|
if text:
|
|
self._queue.put(text)
|
|
elif text is None:
|
|
self.on_segment_break()
|
|
|
|
def finish(self) -> None:
|
|
"""Signal that the stream is complete."""
|
|
self._queue.put(_DONE)
|
|
|
|
async def run(self) -> None:
|
|
"""Async task that drains the queue and edits the platform message."""
|
|
# Platform message length limit — leave room for cursor + formatting
|
|
_raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096)
|
|
_safe_limit = max(500, _raw_limit - len(self.cfg.cursor) - 100)
|
|
|
|
try:
|
|
while True:
|
|
# Drain all available items from the queue
|
|
got_done = False
|
|
got_segment_break = False
|
|
commentary_text = None
|
|
while True:
|
|
try:
|
|
item = self._queue.get_nowait()
|
|
if item is _DONE:
|
|
got_done = True
|
|
break
|
|
if item is _NEW_SEGMENT:
|
|
got_segment_break = True
|
|
break
|
|
if isinstance(item, tuple) and len(item) == 2 and item[0] is _COMMENTARY:
|
|
commentary_text = item[1]
|
|
break
|
|
self._accumulated += item
|
|
except queue.Empty:
|
|
break
|
|
|
|
# Decide whether to flush an edit
|
|
now = time.monotonic()
|
|
elapsed = now - self._last_edit_time
|
|
should_edit = (
|
|
got_done
|
|
or got_segment_break
|
|
or commentary_text is not None
|
|
or (elapsed >= self._current_edit_interval
|
|
and self._accumulated)
|
|
or len(self._accumulated) >= self.cfg.buffer_threshold
|
|
)
|
|
|
|
current_update_visible = False
|
|
if should_edit and self._accumulated:
|
|
# Split overflow: if accumulated text exceeds the platform
|
|
# limit, split into properly sized chunks.
|
|
if (
|
|
len(self._accumulated) > _safe_limit
|
|
and self._message_id is None
|
|
):
|
|
# No existing message to edit (first message or after a
|
|
# segment break). Use truncate_message — the same
|
|
# helper the non-streaming path uses — to split with
|
|
# proper word/code-fence boundaries and chunk
|
|
# indicators like "(1/2)".
|
|
chunks = self.adapter.truncate_message(
|
|
self._accumulated, _safe_limit
|
|
)
|
|
for chunk in chunks:
|
|
await self._send_new_chunk(chunk, self._message_id)
|
|
self._accumulated = ""
|
|
self._last_sent_text = ""
|
|
self._last_edit_time = time.monotonic()
|
|
if got_done:
|
|
self._final_response_sent = self._already_sent
|
|
return
|
|
if got_segment_break:
|
|
self._message_id = None
|
|
self._fallback_final_send = False
|
|
self._fallback_prefix = ""
|
|
continue
|
|
|
|
# Existing message: edit it with the first chunk, then
|
|
# start a new message for the overflow remainder.
|
|
while (
|
|
len(self._accumulated) > _safe_limit
|
|
and self._message_id is not None
|
|
and self._edit_supported
|
|
):
|
|
split_at = self._accumulated.rfind("\n", 0, _safe_limit)
|
|
if split_at < _safe_limit // 2:
|
|
split_at = _safe_limit
|
|
chunk = self._accumulated[:split_at]
|
|
ok = await self._send_or_edit(chunk)
|
|
if self._fallback_final_send or not ok:
|
|
# Edit failed (or backed off due to flood control)
|
|
# while attempting to split an oversized message.
|
|
# Keep the full accumulated text intact so the
|
|
# fallback final-send path can deliver the remaining
|
|
# continuation without dropping content.
|
|
break
|
|
self._accumulated = self._accumulated[split_at:].lstrip("\n")
|
|
self._message_id = None
|
|
self._last_sent_text = ""
|
|
|
|
display_text = self._accumulated
|
|
if not got_done and not got_segment_break and commentary_text is None:
|
|
display_text += self.cfg.cursor
|
|
|
|
current_update_visible = await self._send_or_edit(display_text)
|
|
self._last_edit_time = time.monotonic()
|
|
|
|
if got_done:
|
|
# Final edit without cursor. If progressive editing failed
|
|
# mid-stream, send a single continuation/fallback message
|
|
# here instead of letting the base gateway path send the
|
|
# full response again.
|
|
if self._accumulated:
|
|
if self._fallback_final_send:
|
|
await self._send_fallback_final(self._accumulated)
|
|
elif current_update_visible:
|
|
self._final_response_sent = True
|
|
elif self._message_id:
|
|
self._final_response_sent = await self._send_or_edit(self._accumulated)
|
|
elif not self._already_sent:
|
|
self._final_response_sent = await self._send_or_edit(self._accumulated)
|
|
return
|
|
|
|
if commentary_text is not None:
|
|
self._reset_segment_state()
|
|
await self._send_commentary(commentary_text)
|
|
self._last_edit_time = time.monotonic()
|
|
self._reset_segment_state()
|
|
|
|
# Tool boundary: reset message state so the next text chunk
|
|
# creates a fresh message below any tool-progress messages.
|
|
#
|
|
# Exception: when _message_id is "__no_edit__" the platform
|
|
# never returned a real message ID (e.g. Signal, webhook with
|
|
# github_comment delivery). Resetting to None would re-enter
|
|
# the "first send" path on every tool boundary and post one
|
|
# platform message per tool call — that is what caused 155
|
|
# comments under a single PR. Instead, preserve the sentinel
|
|
# so the full continuation is delivered once via
|
|
# _send_fallback_final.
|
|
# (When editing fails mid-stream due to flood control the id is
|
|
# a real string like "msg_1", not "__no_edit__", so that case
|
|
# still resets and creates a fresh segment as intended.)
|
|
if got_segment_break:
|
|
self._reset_segment_state(preserve_no_edit=True)
|
|
|
|
await asyncio.sleep(0.05) # Small yield to not busy-loop
|
|
|
|
except asyncio.CancelledError:
|
|
# Best-effort final edit on cancellation
|
|
if self._accumulated and self._message_id:
|
|
try:
|
|
await self._send_or_edit(self._accumulated)
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
logger.error("Stream consumer error: %s", e)
|
|
|
|
# Pattern to strip MEDIA:<path> tags (including optional surrounding quotes).
|
|
# Matches the simple cleanup regex used by the non-streaming path in
|
|
# gateway/platforms/base.py for post-processing.
|
|
_MEDIA_RE = re.compile(r'''[`"']?MEDIA:\s*\S+[`"']?''')
|
|
|
|
@staticmethod
|
|
def _clean_for_display(text: str) -> str:
|
|
"""Strip MEDIA: directives and internal markers from text before display.
|
|
|
|
The streaming path delivers raw text chunks that may include
|
|
``MEDIA:<path>`` tags and ``[[audio_as_voice]]`` directives meant for
|
|
the platform adapter's post-processing. The actual media files are
|
|
delivered separately via ``_deliver_media_from_response()`` after the
|
|
stream finishes — we just need to hide the raw directives from the
|
|
user.
|
|
"""
|
|
if "MEDIA:" not in text and "[[audio_as_voice]]" not in text:
|
|
return text
|
|
cleaned = text.replace("[[audio_as_voice]]", "")
|
|
cleaned = GatewayStreamConsumer._MEDIA_RE.sub("", cleaned)
|
|
# Collapse excessive blank lines left behind by removed tags
|
|
cleaned = re.sub(r'\n{3,}', '\n\n', cleaned)
|
|
# Strip trailing whitespace/newlines but preserve leading content
|
|
return cleaned.rstrip()
|
|
|
|
async def _send_new_chunk(self, text: str, reply_to_id: Optional[str]) -> Optional[str]:
|
|
"""Send a new message chunk, optionally threaded to a previous message.
|
|
|
|
Returns the message_id so callers can thread subsequent chunks.
|
|
"""
|
|
text = self._clean_for_display(text)
|
|
if not text.strip():
|
|
return reply_to_id
|
|
try:
|
|
meta = dict(self.metadata) if self.metadata else {}
|
|
result = await self.adapter.send(
|
|
chat_id=self.chat_id,
|
|
content=text,
|
|
reply_to=reply_to_id,
|
|
metadata=meta,
|
|
)
|
|
if result.success and result.message_id:
|
|
self._message_id = str(result.message_id)
|
|
self._already_sent = True
|
|
self._last_sent_text = text
|
|
return str(result.message_id)
|
|
else:
|
|
self._edit_supported = False
|
|
return reply_to_id
|
|
except Exception as e:
|
|
logger.error("Stream send chunk error: %s", e)
|
|
return reply_to_id
|
|
|
|
def _visible_prefix(self) -> str:
|
|
"""Return the visible text already shown in the streamed message."""
|
|
prefix = self._last_sent_text or ""
|
|
if self.cfg.cursor and prefix.endswith(self.cfg.cursor):
|
|
prefix = prefix[:-len(self.cfg.cursor)]
|
|
return self._clean_for_display(prefix)
|
|
|
|
def _continuation_text(self, final_text: str) -> str:
|
|
"""Return only the part of final_text the user has not already seen."""
|
|
prefix = self._fallback_prefix or self._visible_prefix()
|
|
if prefix and final_text.startswith(prefix):
|
|
return final_text[len(prefix):].lstrip()
|
|
return final_text
|
|
|
|
@staticmethod
|
|
def _split_text_chunks(text: str, limit: int) -> list[str]:
|
|
"""Split text into reasonably sized chunks for fallback sends."""
|
|
if len(text) <= limit:
|
|
return [text]
|
|
chunks: list[str] = []
|
|
remaining = text
|
|
while len(remaining) > limit:
|
|
split_at = remaining.rfind("\n", 0, limit)
|
|
if split_at < limit // 2:
|
|
split_at = limit
|
|
chunks.append(remaining[:split_at])
|
|
remaining = remaining[split_at:].lstrip("\n")
|
|
if remaining:
|
|
chunks.append(remaining)
|
|
return chunks
|
|
|
|
async def _send_fallback_final(self, text: str) -> None:
|
|
"""Send the final continuation after streaming edits stop working.
|
|
|
|
Retries each chunk once on flood-control failures with a short delay.
|
|
"""
|
|
final_text = self._clean_for_display(text)
|
|
continuation = self._continuation_text(final_text)
|
|
self._fallback_final_send = False
|
|
if not continuation.strip():
|
|
# Nothing new to send — the visible partial already matches final text.
|
|
self._already_sent = True
|
|
self._final_response_sent = True
|
|
return
|
|
|
|
raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096)
|
|
safe_limit = max(500, raw_limit - 100)
|
|
chunks = self._split_text_chunks(continuation, safe_limit)
|
|
|
|
last_message_id: Optional[str] = None
|
|
last_successful_chunk = ""
|
|
sent_any_chunk = False
|
|
for chunk in chunks:
|
|
# Try sending with one retry on flood-control errors.
|
|
result = None
|
|
for attempt in range(2):
|
|
result = await self.adapter.send(
|
|
chat_id=self.chat_id,
|
|
content=chunk,
|
|
metadata=self.metadata,
|
|
)
|
|
if result.success:
|
|
break
|
|
if attempt == 0 and self._is_flood_error(result):
|
|
logger.debug(
|
|
"Flood control on fallback send, retrying in 3s"
|
|
)
|
|
await asyncio.sleep(3.0)
|
|
else:
|
|
break # non-flood error or second attempt failed
|
|
|
|
if not result or not result.success:
|
|
if sent_any_chunk:
|
|
# Some continuation text already reached the user. Suppress
|
|
# the base gateway final-send path so we don't resend the
|
|
# full response and create another duplicate.
|
|
self._already_sent = True
|
|
self._final_response_sent = True
|
|
self._message_id = last_message_id
|
|
self._last_sent_text = last_successful_chunk
|
|
self._fallback_prefix = ""
|
|
return
|
|
# No fallback chunk reached the user — allow the normal gateway
|
|
# final-send path to try one more time.
|
|
self._already_sent = False
|
|
self._message_id = None
|
|
self._last_sent_text = ""
|
|
self._fallback_prefix = ""
|
|
return
|
|
sent_any_chunk = True
|
|
last_successful_chunk = chunk
|
|
last_message_id = result.message_id or last_message_id
|
|
|
|
self._message_id = last_message_id
|
|
self._already_sent = True
|
|
self._final_response_sent = True
|
|
self._last_sent_text = chunks[-1]
|
|
self._fallback_prefix = ""
|
|
|
|
def _is_flood_error(self, result) -> bool:
|
|
"""Check if a SendResult failure is due to flood control / rate limiting."""
|
|
err = getattr(result, "error", "") or ""
|
|
err_lower = err.lower()
|
|
return "flood" in err_lower or "retry after" in err_lower or "rate" in err_lower
|
|
|
|
async def _try_strip_cursor(self) -> None:
|
|
"""Best-effort edit to remove the cursor from the last visible message.
|
|
|
|
Called when entering fallback mode so the user doesn't see a stuck
|
|
cursor (▉) in the partial message.
|
|
"""
|
|
if not self._message_id or self._message_id == "__no_edit__":
|
|
return
|
|
prefix = self._visible_prefix()
|
|
if not prefix or not prefix.strip():
|
|
return
|
|
try:
|
|
await self.adapter.edit_message(
|
|
chat_id=self.chat_id,
|
|
message_id=self._message_id,
|
|
content=prefix,
|
|
)
|
|
self._last_sent_text = prefix
|
|
except Exception:
|
|
pass # best-effort — don't let this block the fallback path
|
|
|
|
async def _send_commentary(self, text: str) -> bool:
|
|
"""Send a completed interim assistant commentary message."""
|
|
text = self._clean_for_display(text)
|
|
if not text.strip():
|
|
return False
|
|
try:
|
|
result = await self.adapter.send(
|
|
chat_id=self.chat_id,
|
|
content=text,
|
|
metadata=self.metadata,
|
|
)
|
|
if result.success:
|
|
self._already_sent = True
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Commentary send error: %s", e)
|
|
return False
|
|
|
|
async def _send_or_edit(self, text: str) -> bool:
|
|
"""Send or edit the streaming message.
|
|
|
|
Returns True if the text was successfully delivered (sent or edited),
|
|
False otherwise. Callers like the overflow split loop use this to
|
|
decide whether to advance past the delivered chunk.
|
|
"""
|
|
# Strip MEDIA: directives so they don't appear as visible text.
|
|
# Media files are delivered as native attachments after the stream
|
|
# finishes (via _deliver_media_from_response in gateway/run.py).
|
|
text = self._clean_for_display(text)
|
|
if not text.strip():
|
|
return True # nothing to send is "success"
|
|
try:
|
|
if self._message_id is not None:
|
|
if self._edit_supported:
|
|
# Skip if text is identical to what we last sent
|
|
if text == self._last_sent_text:
|
|
return True
|
|
# Edit existing message
|
|
result = await self.adapter.edit_message(
|
|
chat_id=self.chat_id,
|
|
message_id=self._message_id,
|
|
content=text,
|
|
)
|
|
if result.success:
|
|
self._already_sent = True
|
|
self._last_sent_text = text
|
|
# Successful edit — reset flood strike counter
|
|
self._flood_strikes = 0
|
|
return True
|
|
else:
|
|
# Edit failed. If this looks like flood control / rate
|
|
# limiting, use adaptive backoff: double the edit interval
|
|
# and retry on the next cycle. Only permanently disable
|
|
# edits after _MAX_FLOOD_STRIKES consecutive failures.
|
|
if self._is_flood_error(result):
|
|
self._flood_strikes += 1
|
|
self._current_edit_interval = min(
|
|
self._current_edit_interval * 2, 10.0,
|
|
)
|
|
logger.debug(
|
|
"Flood control on edit (strike %d/%d), "
|
|
"backoff interval → %.1fs",
|
|
self._flood_strikes,
|
|
self._MAX_FLOOD_STRIKES,
|
|
self._current_edit_interval,
|
|
)
|
|
if self._flood_strikes < self._MAX_FLOOD_STRIKES:
|
|
# Don't disable edits yet — just slow down.
|
|
# Update _last_edit_time so the next edit
|
|
# respects the new interval.
|
|
self._last_edit_time = time.monotonic()
|
|
return False
|
|
|
|
# Non-flood error OR flood strikes exhausted: enter
|
|
# fallback mode — send only the missing tail once the
|
|
# final response is available.
|
|
logger.debug(
|
|
"Edit failed (strikes=%d), entering fallback mode",
|
|
self._flood_strikes,
|
|
)
|
|
self._fallback_prefix = self._visible_prefix()
|
|
self._fallback_final_send = True
|
|
self._edit_supported = False
|
|
self._already_sent = True
|
|
# Best-effort: strip the cursor from the last visible
|
|
# message so the user doesn't see a stuck ▉.
|
|
await self._try_strip_cursor()
|
|
return False
|
|
else:
|
|
# Editing not supported — skip intermediate updates.
|
|
# The final response will be sent by the fallback path.
|
|
return False
|
|
else:
|
|
# First message — send new
|
|
result = await self.adapter.send(
|
|
chat_id=self.chat_id,
|
|
content=text,
|
|
metadata=self.metadata,
|
|
)
|
|
if result.success:
|
|
if result.message_id:
|
|
self._message_id = result.message_id
|
|
else:
|
|
self._edit_supported = False
|
|
self._already_sent = True
|
|
self._last_sent_text = text
|
|
if not result.message_id:
|
|
self._fallback_prefix = self._visible_prefix()
|
|
self._fallback_final_send = True
|
|
# Sentinel prevents re-entering the first-send path on
|
|
# every delta/tool boundary when platforms accept a
|
|
# message but do not return an editable message id.
|
|
self._message_id = "__no_edit__"
|
|
return True
|
|
else:
|
|
# Initial send failed — disable streaming for this session
|
|
self._edit_supported = False
|
|
return False
|
|
except Exception as e:
|
|
logger.error("Stream send/edit error: %s", e)
|
|
return False
|