mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-04 02:21:47 +00:00
fix(gateway): flush undelivered tail before segment reset to preserve streamed text (#8124)
When a streaming edit fails mid-stream (flood control, transport error) and a tool boundary arrives before the fallback threshold is reached, the pre-boundary tail in `_accumulated` was silently discarded by `_reset_segment_state`. The user saw a frozen partial message and missing words on the other side of the tool call. Flush the undelivered tail as a continuation message before the reset, computed relative to the last successfully-delivered prefix so we don't duplicate content the user already saw.
This commit is contained in:
parent
e017131403
commit
1d1e1277e4
2 changed files with 105 additions and 2 deletions
|
|
@ -430,6 +430,21 @@ class GatewayStreamConsumer:
|
||||||
# a real string like "msg_1", not "__no_edit__", so that case
|
# a real string like "msg_1", not "__no_edit__", so that case
|
||||||
# still resets and creates a fresh segment as intended.)
|
# still resets and creates a fresh segment as intended.)
|
||||||
if got_segment_break:
|
if got_segment_break:
|
||||||
|
# If the segment-break edit failed to deliver the
|
||||||
|
# accumulated content (flood control that has not yet
|
||||||
|
# promoted to fallback mode, or fallback mode itself),
|
||||||
|
# _accumulated still holds pre-boundary text the user
|
||||||
|
# never saw. Flush that tail as a continuation message
|
||||||
|
# before the reset below wipes _accumulated — otherwise
|
||||||
|
# text generated before the tool boundary is silently
|
||||||
|
# dropped (issue #8124).
|
||||||
|
if (
|
||||||
|
self._accumulated
|
||||||
|
and not current_update_visible
|
||||||
|
and self._message_id
|
||||||
|
and self._message_id != "__no_edit__"
|
||||||
|
):
|
||||||
|
await self._flush_segment_tail_on_edit_failure()
|
||||||
self._reset_segment_state(preserve_no_edit=True)
|
self._reset_segment_state(preserve_no_edit=True)
|
||||||
|
|
||||||
await asyncio.sleep(0.05) # Small yield to not busy-loop
|
await asyncio.sleep(0.05) # Small yield to not busy-loop
|
||||||
|
|
@ -620,6 +635,39 @@ class GatewayStreamConsumer:
|
||||||
err_lower = err.lower()
|
err_lower = err.lower()
|
||||||
return "flood" in err_lower or "retry after" in err_lower or "rate" in err_lower
|
return "flood" in err_lower or "retry after" in err_lower or "rate" in err_lower
|
||||||
|
|
||||||
|
async def _flush_segment_tail_on_edit_failure(self) -> None:
|
||||||
|
"""Deliver un-sent tail content before a segment-break reset.
|
||||||
|
|
||||||
|
When an edit fails (flood control, transport error) and a tool
|
||||||
|
boundary arrives before the next retry, ``_accumulated`` holds text
|
||||||
|
that was generated but never shown to the user. Without this flush,
|
||||||
|
the segment reset would discard that tail and leave a frozen cursor
|
||||||
|
in the partial message.
|
||||||
|
|
||||||
|
Sends the tail that sits after the last successfully-delivered
|
||||||
|
prefix as a new message, and best-effort strips the stuck cursor
|
||||||
|
from the previous partial message.
|
||||||
|
"""
|
||||||
|
if not self._fallback_final_send:
|
||||||
|
await self._try_strip_cursor()
|
||||||
|
visible = self._fallback_prefix or self._visible_prefix()
|
||||||
|
tail = self._accumulated
|
||||||
|
if visible and tail.startswith(visible):
|
||||||
|
tail = tail[len(visible):].lstrip()
|
||||||
|
tail = self._clean_for_display(tail)
|
||||||
|
if not tail.strip():
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
result = await self.adapter.send(
|
||||||
|
chat_id=self.chat_id,
|
||||||
|
content=tail,
|
||||||
|
metadata=self.metadata,
|
||||||
|
)
|
||||||
|
if result.success:
|
||||||
|
self._already_sent = True
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Segment-break tail flush error: %s", e)
|
||||||
|
|
||||||
async def _try_strip_cursor(self) -> None:
|
async def _try_strip_cursor(self) -> None:
|
||||||
"""Best-effort edit to remove the cursor from the last visible message.
|
"""Best-effort edit to remove the cursor from the last visible message.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -502,11 +502,13 @@ class TestSegmentBreakOnToolBoundary:
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_segment_break_clears_failed_edit_fallback_state(self):
|
async def test_segment_break_clears_failed_edit_fallback_state(self):
|
||||||
"""A tool boundary after edit failure must not duplicate the next segment."""
|
"""A tool boundary after edit failure must flush the undelivered tail
|
||||||
|
without duplicating the prefix the user already saw (#8124)."""
|
||||||
adapter = MagicMock()
|
adapter = MagicMock()
|
||||||
send_results = [
|
send_results = [
|
||||||
SimpleNamespace(success=True, message_id="msg_1"),
|
SimpleNamespace(success=True, message_id="msg_1"),
|
||||||
SimpleNamespace(success=True, message_id="msg_2"),
|
SimpleNamespace(success=True, message_id="msg_2"),
|
||||||
|
SimpleNamespace(success=True, message_id="msg_3"),
|
||||||
]
|
]
|
||||||
adapter.send = AsyncMock(side_effect=send_results)
|
adapter.send = AsyncMock(side_effect=send_results)
|
||||||
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=False, error="flood_control:6"))
|
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=False, error="flood_control:6"))
|
||||||
|
|
@ -526,7 +528,60 @@ class TestSegmentBreakOnToolBoundary:
|
||||||
await task
|
await task
|
||||||
|
|
||||||
sent_texts = [call[1]["content"] for call in adapter.send.call_args_list]
|
sent_texts = [call[1]["content"] for call in adapter.send.call_args_list]
|
||||||
assert sent_texts == ["Hello ▉", "Next segment"]
|
# The undelivered "world" tail must reach the user, and the next
|
||||||
|
# segment must not duplicate "Hello" that was already visible.
|
||||||
|
assert sent_texts == ["Hello ▉", "world", "Next segment"]
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_segment_break_after_mid_stream_edit_failure_preserves_tail(self):
|
||||||
|
"""Regression for #8124: when an earlier edit succeeded but later edits
|
||||||
|
fail (persistent flood control) and a tool boundary arrives before the
|
||||||
|
fallback threshold is reached, the pre-boundary tail must still be
|
||||||
|
delivered — not silently dropped by the segment reset."""
|
||||||
|
adapter = MagicMock()
|
||||||
|
# msg_1 for the initial partial, msg_2 for the flushed tail,
|
||||||
|
# msg_3 for the post-boundary segment.
|
||||||
|
send_results = [
|
||||||
|
SimpleNamespace(success=True, message_id="msg_1"),
|
||||||
|
SimpleNamespace(success=True, message_id="msg_2"),
|
||||||
|
SimpleNamespace(success=True, message_id="msg_3"),
|
||||||
|
]
|
||||||
|
adapter.send = AsyncMock(side_effect=send_results)
|
||||||
|
|
||||||
|
# First two edits succeed, everything after fails with flood control
|
||||||
|
# — simulating Telegram's "edit once then get rate-limited" pattern.
|
||||||
|
edit_results = [
|
||||||
|
SimpleNamespace(success=True), # "Hello world ▉" — succeeds
|
||||||
|
SimpleNamespace(success=False, error="flood_control:6.0"), # "Hello world more ▉" — flood triggered
|
||||||
|
SimpleNamespace(success=False, error="flood_control:6.0"), # finalize edit at segment break
|
||||||
|
SimpleNamespace(success=False, error="flood_control:6.0"), # cursor-strip attempt
|
||||||
|
]
|
||||||
|
adapter.edit_message = AsyncMock(side_effect=edit_results + [edit_results[-1]] * 10)
|
||||||
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
||||||
|
|
||||||
|
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉")
|
||||||
|
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
|
||||||
|
|
||||||
|
consumer.on_delta("Hello")
|
||||||
|
task = asyncio.create_task(consumer.run())
|
||||||
|
await asyncio.sleep(0.08)
|
||||||
|
consumer.on_delta(" world")
|
||||||
|
await asyncio.sleep(0.08)
|
||||||
|
consumer.on_delta(" more")
|
||||||
|
await asyncio.sleep(0.08)
|
||||||
|
consumer.on_delta(None) # tool boundary
|
||||||
|
consumer.on_delta("Here is the tool result.")
|
||||||
|
consumer.finish()
|
||||||
|
await task
|
||||||
|
|
||||||
|
sent_texts = [call[1]["content"] for call in adapter.send.call_args_list]
|
||||||
|
# "more" must have been delivered, not dropped.
|
||||||
|
all_text = " ".join(sent_texts)
|
||||||
|
assert "more" in all_text, (
|
||||||
|
f"Pre-boundary tail 'more' was silently dropped: sends={sent_texts}"
|
||||||
|
)
|
||||||
|
# Post-boundary text must also reach the user.
|
||||||
|
assert "Here is the tool result." in all_text
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_no_message_id_enters_fallback_mode(self):
|
async def test_no_message_id_enters_fallback_mode(self):
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue