mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix: stream consumer creates new message after tool boundaries (#5739)
When streaming was enabled on the gateway, the stream consumer created a single message at the start and kept editing it as tokens arrived. Tool progress messages were sent as separate messages below it. Since edits don't change message position on Telegram/Matrix/Discord, the final response ended up stuck above all tool progress messages — users had to scroll up past potentially dozens of tool call lines to read the answer. The agent already sends stream_delta_callback(None) at tool boundaries (before _execute_tool_calls). The stream consumer was ignoring this signal. Now it treats None as a segment break: finalizes the current message (removes cursor), resets _message_id, and the next text chunk creates a fresh message below the tool progress messages. Timeline before: [msg 1: 'Let me search...' → edits → 'Here is the answer'] ← top [msg 2: tool progress lines] ← bottom Timeline after: [msg 1: 'Let me search...'] ← top [msg 2: tool progress lines] [msg 3: 'Here is the answer'] ← bottom (visible) Reported by SkyLinx on Discord.
This commit is contained in:
parent
5a2cf280a3
commit
8dee82ea1e
2 changed files with 174 additions and 2 deletions
|
|
@ -28,6 +28,10 @@ logger = logging.getLogger("gateway.stream_consumer")
|
|||
# Sentinel to signal the stream is complete
|
||||
_DONE = object()
|
||||
|
||||
# Sentinel to signal a tool boundary — finalize current message and start a
|
||||
# new one so that subsequent text appears below tool progress messages.
|
||||
_NEW_SEGMENT = object()
|
||||
|
||||
|
||||
@dataclass
|
||||
class StreamConsumerConfig:
|
||||
|
|
@ -78,9 +82,16 @@ class GatewayStreamConsumer:
|
|||
return self._already_sent
|
||||
|
||||
def on_delta(self, text: str) -> None:
|
||||
"""Thread-safe callback — called from the agent's worker thread."""
|
||||
"""Thread-safe callback — called from the agent's worker thread.
|
||||
|
||||
When *text* is ``None``, signals a tool boundary: the current message
|
||||
is finalized and subsequent text will be sent as a new message so it
|
||||
appears below any tool-progress messages the gateway sent in between.
|
||||
"""
|
||||
if text:
|
||||
self._queue.put(text)
|
||||
elif text is None:
|
||||
self._queue.put(_NEW_SEGMENT)
|
||||
|
||||
def finish(self) -> None:
|
||||
"""Signal that the stream is complete."""
|
||||
|
|
@ -96,12 +107,16 @@ class GatewayStreamConsumer:
|
|||
while True:
|
||||
# Drain all available items from the queue
|
||||
got_done = False
|
||||
got_segment_break = False
|
||||
while True:
|
||||
try:
|
||||
item = self._queue.get_nowait()
|
||||
if item is _DONE:
|
||||
got_done = True
|
||||
break
|
||||
if item is _NEW_SEGMENT:
|
||||
got_segment_break = True
|
||||
break
|
||||
self._accumulated += item
|
||||
except queue.Empty:
|
||||
break
|
||||
|
|
@ -111,6 +126,7 @@ class GatewayStreamConsumer:
|
|||
elapsed = now - self._last_edit_time
|
||||
should_edit = (
|
||||
got_done
|
||||
or got_segment_break
|
||||
or (elapsed >= self.cfg.edit_interval
|
||||
and len(self._accumulated) > 0)
|
||||
or len(self._accumulated) >= self.cfg.buffer_threshold
|
||||
|
|
@ -133,7 +149,7 @@ class GatewayStreamConsumer:
|
|||
self._last_sent_text = ""
|
||||
|
||||
display_text = self._accumulated
|
||||
if not got_done:
|
||||
if not got_done and not got_segment_break:
|
||||
display_text += self.cfg.cursor
|
||||
|
||||
await self._send_or_edit(display_text)
|
||||
|
|
@ -145,6 +161,15 @@ class GatewayStreamConsumer:
|
|||
await self._send_or_edit(self._accumulated)
|
||||
return
|
||||
|
||||
# Tool boundary: the should_edit block above already flushed
|
||||
# accumulated text without a cursor. Reset state so the next
|
||||
# text chunk creates a fresh message below any tool-progress
|
||||
# messages the gateway sent in between.
|
||||
if got_segment_break:
|
||||
self._message_id = None
|
||||
self._accumulated = ""
|
||||
self._last_sent_text = ""
|
||||
|
||||
await asyncio.sleep(0.05) # Small yield to not busy-loop
|
||||
|
||||
except asyncio.CancelledError:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue