fix(gateway): recover partial Telegram overflow streams

This commit is contained in:
GodsBoy 2026-06-09 12:28:43 +02:00 committed by Teknium
parent 88fcf0c8c0
commit 590b3c0d7e
6 changed files with 453 additions and 6 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 428 KiB

View file

@ -0,0 +1,240 @@
---
title: "fix: Prevent Telegram streamed replies from ending after first overflow chunk"
status: active
date: 2026-06-09
type: fix
target_repo: hermes-agent
origin: user-reported Telegram topic screenshot
---
# fix: Prevent Telegram streamed replies from ending after first overflow chunk
## Summary
Fix a Telegram gateway bug where a long streamed assistant reply can appear to stop mid-answer in a topic after the first overflow chunk. The reported screenshot shows a long Hermes response in the `Nehemiah - Coding` Telegram topic ending at `- The visible tool-call summary`, followed by the user noting that the previous message did not finish streaming to that Telegram topic.
The plan targets the streamed edit overflow path, not general model generation. A completed assistant response must either reach Telegram in full across all continuation messages or leave enough state for the gateway fallback path to deliver the remaining content instead of marking the turn complete after a partial delivery.
---
## Problem Frame
Telegram limits message text to 4096 UTF-16 code units. Hermes streams gateway responses by editing a message and, when a streamed message grows past the limit, splitting the overflow into additional Telegram messages. The adapter already has a split-and-deliver path for oversized edits, but the partial-continuation failure contract is weak: if chunk 1 is edited successfully and a later continuation fails, the adapter can still report success for the operation. The stream consumer may then mark the final response delivered even though the visible topic only contains the first part.
This is especially visible in Telegram forum topics because a long final response can be split below tool-progress bubbles, and a missing continuation looks exactly like the stream stopped mid-answer.
---
## Requirements
- R1. Long streamed Telegram replies must preserve all final content across overflow chunks.
- R2. If any continuation chunk fails after the first overflow edit lands, the gateway must not mark the final response as fully delivered.
- R3. Continuation chunks must remain routed to the same Telegram topic/thread as the original response.
- R4. The fix must avoid duplicate full-answer sends when all overflow chunks were delivered successfully.
- R5. Tests must cover the reported failure shape: a final streamed reply that exceeds Telegram's limit, succeeds on the first edit, fails on a continuation, and must not be treated as complete.
---
## Key Technical Decisions
- Treat overflow delivery as all-or-not-complete. `_edit_overflow_split` should only return a successful final-delivery result when every planned chunk reaches Telegram. Partial delivery is a distinct outcome that downstream code can recover from.
- Carry partial-overflow metadata through `SendResult.raw_response` rather than adding a new public dataclass field unless implementation proves the existing result shape is insufficient. The stream consumer already inspects `SendResult` after adapter edits, so a small raw response contract can keep the change contained.
- Make the stream consumer responsible for final-delivery truth. The adapter knows which chunks landed, but the consumer owns `_final_response_sent`, `_final_content_delivered`, `_fallback_prefix`, and fallback final-send behaviour.
- Keep routing inside Telegram adapter helpers. Continuation sends should continue to use `_thread_kwargs_for_send(...)` with metadata-derived `message_thread_id` and reply anchors so forum topic behaviour stays consistent.
---
## High-Level Technical Design
```mermaid
sequenceDiagram
participant C as GatewayStreamConsumer
participant T as TelegramAdapter.edit_message
participant B as Telegram Bot API
C->>T: finalize/edit long accumulated response
T->>B: edit original message with chunk 1
loop remaining chunks
T->>B: send continuation in same topic/thread
end
alt all chunks delivered
T-->>C: success, last message id, continuation ids
C->>C: mark final response delivered
else any continuation failed
T-->>C: partial overflow failure with delivered prefix metadata
C->>C: do not mark final delivered
C->>B: fallback sends missing tail or full final response safely
end
```
---
## Implementation Units
### U1. Add a partial-overflow contract for Telegram edit splits
**Goal:** Make `TelegramAdapter._edit_overflow_split` distinguish complete overflow delivery from partial delivery.
**Requirements:** R1, R2, R4
**Dependencies:** None
**Files:**
- `gateway/platforms/telegram.py`
- `tests/gateway/test_telegram_send.py` or the existing Telegram adapter test module that already covers `edit_message` overflow behaviour
**Approach:**
- Keep the successful path unchanged when every chunk is delivered: return `SendResult(success=True, message_id=<last chunk>, continuation_message_ids=(...))`.
- When a continuation fails after the first edit, return a result that clearly indicates partial delivery instead of plain success. Prefer `success=False`, `retryable=True`, and `raw_response` metadata such as delivered chunk count, total chunk count, last delivered message id, and the visible delivered prefix.
- Preserve logging, but do not rely on logs as the only signal. The caller must be able to tell partial delivery happened.
- Ensure the first edited chunk and all successful continuation chunks still include the existing Markdown/plain-text fallback behaviour.
**Patterns to follow:**
- Existing overflow handling in `TelegramAdapter.edit_message` and `_edit_overflow_split`.
- Existing `SendResult` semantics in `gateway/platforms/base.py`, especially `retryable`, `raw_response`, and `continuation_message_ids`.
**Test scenarios:**
- Oversized finalized edit where all continuations succeed returns success, the last continuation id, and all continuation ids.
- Oversized finalized edit where the first continuation send fails returns a partial-overflow failure and does not report success.
- Oversized finalized edit where one continuation succeeds and a later continuation fails reports the last delivered continuation id and delivered count in raw metadata.
- A continuation MarkdownV2 formatting failure still retries plain text before being treated as a delivery failure.
**Verification:** Adapter tests prove complete overflow remains successful and partial overflow is observable by the caller.
### U2. Teach the stream consumer to recover from partial overflow
**Goal:** Ensure a partial Telegram overflow does not set `_final_response_sent` or `_final_content_delivered` unless the full response reached the user.
**Requirements:** R1, R2, R4, R5
**Dependencies:** U1
**Files:**
- `gateway/stream_consumer.py`
- `tests/gateway/test_stream_consumer.py` or a focused new `tests/gateway/test_stream_consumer_telegram_overflow.py`
**Approach:**
- In `_send_or_edit`, when `adapter.edit_message(...)` returns a partial-overflow failure, update consumer state to reflect the last visible prefix/message and enter fallback delivery for the missing content.
- Avoid treating `_already_sent` as final delivery. A partial visible message can be true while final delivery is false.
- Use the delivered-prefix metadata if available so `_send_fallback_final(...)` sends only the missing tail. If implementation finds the prefix is unreliable after Markdown formatting, prefer sending the complete final response as a fresh fallback message rather than silently dropping the tail.
- Keep the existing success handling for `continuation_message_ids` when the adapter delivered all chunks.
**Patterns to follow:**
- Existing fallback mode in `GatewayStreamConsumer._send_or_edit` and `_send_fallback_final`.
- Existing comments around `_final_response_sent`, `_final_content_delivered`, and `_fallback_prefix` for prior partial-delivery regressions.
**Test scenarios:**
- A final streamed response that overflows and receives a complete-success edit split sets final-delivery flags and does not invoke fallback.
- A final streamed response whose adapter reports partial overflow does not set final-delivery flags immediately.
- After partial overflow, fallback delivery sends the remaining tail and then marks final content delivered only if the fallback send succeeds.
- If fallback delivery also fails, the consumer leaves final-delivery false so the gateway's non-streaming final-send safety path can still run.
**Verification:** Stream consumer tests reproduce the screenshot shape by simulating first chunk visible and continuation failure, then assert the final answer is not suppressed.
### U3. Preserve Telegram topic/thread routing for overflow and fallback continuations
**Goal:** Ensure overflow recovery messages land in the same Telegram forum topic or DM topic fallback context.
**Requirements:** R3
**Dependencies:** U1, U2
**Files:**
- `gateway/platforms/telegram.py`
- `gateway/stream_consumer.py`
- `tests/gateway/test_stream_consumer_thread_routing.py`
- Relevant Telegram adapter routing tests, if existing coverage is closer there
**Approach:**
- Keep passing `metadata` through every overflow continuation and fallback send.
- Keep reply anchors where valid, but do not let a missing reply anchor drop the `message_thread_id` for normal forum topics.
- For private DM topic fallback metadata, preserve the existing stricter anchor behaviour documented in the adapter comments.
**Patterns to follow:**
- `TelegramAdapter._thread_kwargs_for_send(...)`.
- Existing tests around Telegram topic recovery and stream consumer thread routing.
**Test scenarios:**
- Overflow continuations include `message_thread_id` for a forum topic.
- A continuation retry after `reply message not found` keeps forum topic routing when allowed.
- Partial-overflow fallback sends receive the same metadata passed to the original stream consumer.
**Verification:** Thread-routing assertions inspect fake bot calls and confirm all continuation/fallback messages carry the expected topic metadata.
### U4. Add issue evidence and PR body traceability
**Goal:** Make the upstream issue and PR clearly trace the user-visible bug and verification evidence.
**Requirements:** R5
**Dependencies:** U1, U2, U3
**Files:**
- GitHub issue body created via `gh issue create`
- PR body using `.github/PULL_REQUEST_TEMPLATE.md`
**Approach:**
- Create a GitHub issue with the screenshot evidence: the long message in the `Nehemiah - Coding` Telegram topic stops at `- The visible tool-call summary`, and the user's reply says the previous message did not finish streaming to that Telegram topic.
- Reference affected component as Gateway and platform as Telegram.
- In the PR body, link the issue with `Fixes #...`, describe the split-delivery contract change, and include the screenshot or attach it if GitHub upload is available.
- Follow `CONTRIBUTING.md` and the repository PR template exactly.
**Patterns to follow:**
- `.github/ISSUE_TEMPLATE/bug_report.yml`
- `.github/PULL_REQUEST_TEMPLATE.md`
**Test scenarios:**
- Test expectation: none, this is tracker and PR documentation work.
**Verification:** The GitHub issue exists with screenshot evidence or an explicit screenshot reference, and the PR body links the issue and lists the tests run.
---
## Scope Boundaries
### In Scope
- Telegram streamed response overflow splitting and recovery.
- Stream consumer final-delivery truth for partial overflow delivery.
- Topic/thread metadata preservation for overflow and fallback continuation sends.
- Focused unit tests around adapter and stream consumer behaviour.
### Out of Scope
- Changing model streaming semantics in `run_agent.py`.
- Reworking Telegram draft streaming, which is DM-only and not the forum-topic path in the screenshot.
- Changing general platform message splitting for Discord, Slack, WhatsApp, or Matrix unless a shared helper must be corrected for the Telegram fix.
- Altering tool-progress display settings or terminal progress rendering.
### Deferred to Follow-Up Work
- Broader observability for gateway delivery completeness across all messaging platforms.
- A user-facing resend/recover command for a previous truncated response.
---
## Risks & Mitigations
- Risk: fallback recovery duplicates already-visible first chunks. Mitigation: use delivered-prefix metadata where reliable and add tests for no-duplicate complete-success behaviour.
- Risk: preserving forum topic routing while dropping invalid reply anchors is easy to regress. Mitigation: include fake bot call assertions for `message_thread_id` and reply behaviour.
- Risk: MarkdownV2 formatting can alter visible/raw prefix comparisons. Mitigation: keep fallback conservative; duplicate content is preferable to silently missing content, but tests should keep the common path tail-only.
---
## Sources & Research
- User-provided screenshot at `/root/.hermes/image_cache/img_f664e68f6ddf.jpg`.
- `gateway/stream_consumer.py` streamed edit, overflow, fallback, and final-delivery state handling.
- `gateway/platforms/telegram.py` Telegram send/edit overflow splitting and topic routing helpers.
- `gateway/platforms/base.py` `SendResult` contract and shared message chunking helper.
- `tests/gateway/test_stream_consumer.py`, `tests/gateway/test_stream_consumer_thread_routing.py`, and Telegram adapter tests for focused regression coverage.
---
## Verification Strategy
- Run focused Telegram adapter overflow tests.
- Run focused stream consumer overflow/fallback tests.
- Run topic-routing tests affected by metadata changes.
- Run the gateway test subset around Telegram send/edit, stream consumer, and run progress if touched.
- Before PR creation, ensure `git diff` contains only the plan, implementation, tests, and PR/issue-relevant documentation for this bug.

View file

@ -1545,6 +1545,13 @@ class SendResult:
message_id: Optional[str] = None
error: Optional[str] = None
raw_response: Any = None
# Adapter-specific metadata. Cross-layer contracts that affect delivery
# semantics must be documented at the producer and consumer sites. Current
# known contract: Telegram edit overflow partials set
# raw_response["partial_overflow"] with delivered_chunks, total_chunks,
# last_message_id, delivered_prefix, and continuation_message_ids so the
# stream consumer can send the missing tail instead of marking a clipped
# response complete.
retryable: bool = False # True for transient connection errors — base will retry automatically
# When the adapter had to split an oversized payload across multiple
# platform messages (e.g. Telegram edit_message overflow split-and-deliver),

View file

@ -2379,6 +2379,7 @@ class TelegramAdapter(BasePlatformAdapter):
# are already correctly sized). Best-effort MarkdownV2 with plain
# fallback, mirroring send().
continuation_ids: list[str] = []
delivered_chunks = [first_chunk]
prev_id = message_id
thread_id = self._metadata_thread_id(metadata)
for chunk in chunks[1:]:
@ -2442,17 +2443,37 @@ class TelegramAdapter(BasePlatformAdapter):
break
if sent_msg is None:
# Continuation failed — the user has chunk 1 + however many
# continuations succeeded. Report success with what we got
# so the stream consumer knows the edit landed; the
# remaining tail is lost on this attempt and the next
# streaming tick may retry.
# continuations succeeded, but NOT the full response. Do not
# report success: the stream consumer treats a successful edit
# as final delivery on got_done, which would suppress fallback
# delivery and leave the Telegram topic clipped after the last
# delivered chunk.
logger.warning(
"[%s] Overflow split: stopped at %d/%d chunks delivered",
self.name, 1 + len(continuation_ids), len(chunks),
)
break
delivered_prefix = "".join(
re.sub(r" \(\d+/\d+\)$", "", delivered)
for delivered in delivered_chunks
)
return SendResult(
success=False,
message_id=prev_id,
error="overflow_continuation_failed",
retryable=True,
raw_response={
"partial_overflow": True,
"delivered_chunks": 1 + len(continuation_ids),
"total_chunks": len(chunks),
"last_message_id": prev_id,
"delivered_prefix": delivered_prefix,
"continuation_message_ids": tuple(continuation_ids),
},
continuation_message_ids=tuple(continuation_ids),
)
new_id = str(getattr(sent_msg, "message_id", "")) or prev_id
continuation_ids.append(new_id)
delivered_chunks.append(chunk)
prev_id = new_id
last_id = continuation_ids[-1] if continuation_ids else message_id

View file

@ -149,6 +149,10 @@ class GatewayStreamConsumer:
self._last_sent_text = "" # Track last-sent text to skip redundant edits
self._fallback_final_send = False
self._fallback_prefix = ""
# True when fallback is sending only the missing tail after a partial
# Telegram overflow delivery. In that case the already-visible prefix
# is intentional content, not a stale preview to delete.
self._fallback_preserve_partial_messages = False
self._flood_strikes = 0 # Consecutive flood-control edit failures
self._current_edit_interval = self.cfg.edit_interval # Adaptive backoff
self._final_response_sent = False
@ -261,6 +265,7 @@ class GatewayStreamConsumer:
self._last_sent_text = ""
self._fallback_final_send = False
self._fallback_prefix = ""
self._fallback_preserve_partial_messages = False
# #29346: a tool/segment boundary means what we delivered was an interim
# preamble, not the final answer — clear the flags so a premature setter
# can't fool the gateway. Safe: got_done returns before any reset, and
@ -871,7 +876,11 @@ class GatewayStreamConsumer:
# implement ``delete_message``, the delete fails (flood control still
# active, bot lacks permission, message too old to delete), the
# partial remains but at least the full answer was delivered.
if stale_message_id and stale_message_id != last_message_id:
if (
stale_message_id
and stale_message_id != last_message_id
and not self._fallback_preserve_partial_messages
):
delete_fn = getattr(self.adapter, "delete_message", None)
if delete_fn is not None:
try:
@ -888,6 +897,7 @@ class GatewayStreamConsumer:
self._final_content_delivered = True
self._last_sent_text = chunks[-1]
self._fallback_prefix = ""
self._fallback_preserve_partial_messages = False
def _is_flood_error(self, result) -> bool:
"""Check if a SendResult failure is due to flood control / rate limiting."""
@ -1274,6 +1284,35 @@ class GatewayStreamConsumer:
self._flood_strikes = 0
return True
else:
raw_response = getattr(result, "raw_response", None)
if isinstance(raw_response, dict) and raw_response.get("partial_overflow"):
# Telegram edited/sent one or more overflow chunks,
# but not the complete response. Preserve the
# visible prefix so the got_done fallback sends the
# missing tail instead of marking a clipped topic
# reply as final delivery.
self._message_id = str(
raw_response.get("last_message_id")
or result.message_id
or self._message_id
)
delivered_prefix = raw_response.get("delivered_prefix")
if isinstance(delivered_prefix, str) and delivered_prefix:
self._last_sent_text = delivered_prefix
self._fallback_prefix = delivered_prefix
self._fallback_preserve_partial_messages = text.startswith(
delivered_prefix
)
else:
self._fallback_prefix = self._visible_prefix()
self._fallback_preserve_partial_messages = False
self._fallback_final_send = True
self._edit_supported = False
self._already_sent = True
if getattr(result, "continuation_message_ids", ()):
self._notify_new_message()
return False
# Edit failed. If this looks like flood control / rate
# limiting, use adaptive backoff: double the edit interval
# and retry on the next cycle. Only permanently disable

View file

@ -0,0 +1,140 @@
"""Regression coverage for partial Telegram overflow delivery."""
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
import pytest
from gateway.config import PlatformConfig
from gateway.platforms.base import SendResult
from gateway.platforms.telegram import TelegramAdapter
from gateway.stream_consumer import GatewayStreamConsumer
def _message(message_id: int | str) -> SimpleNamespace:
return SimpleNamespace(message_id=message_id)
@pytest.fixture
def telegram_adapter() -> TelegramAdapter:
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="fake-token"))
adapter._bot = MagicMock()
object.__setattr__(adapter, "MAX_MESSAGE_LENGTH", 160)
return adapter
@pytest.mark.asyncio
async def test_edit_overflow_split_reports_success_when_all_continuations_land(telegram_adapter):
"""Complete overflow delivery keeps the existing successful contract."""
content = "word " * 120
telegram_adapter._bot.edit_message_text = AsyncMock(return_value=True)
telegram_adapter._bot.send_message = AsyncMock(
side_effect=[_message(202), _message(203), _message(204), _message(205)]
)
result = await telegram_adapter._edit_overflow_split(
"12345", "201", content, finalize=False, metadata={"thread_id": "77"}
)
assert result.success is True
assert result.message_id == result.continuation_message_ids[-1]
assert result.raw_response is None
assert telegram_adapter._bot.edit_message_text.await_count == 1
assert telegram_adapter._bot.send_message.await_count == len(result.continuation_message_ids)
for call in telegram_adapter._bot.send_message.await_args_list:
assert call.kwargs["message_thread_id"] == 77
@pytest.mark.asyncio
async def test_edit_overflow_split_reports_later_partial_failure_after_some_continuations_land(telegram_adapter):
"""Partial metadata tracks the last delivered continuation before failure."""
content = "word " * 120
telegram_adapter._bot.edit_message_text = AsyncMock(return_value=True)
telegram_adapter._bot.send_message = AsyncMock(
side_effect=[
_message(202),
RuntimeError("telegram send failed"),
RuntimeError("telegram send failed"),
]
)
result = await telegram_adapter._edit_overflow_split(
"12345", "201", content, finalize=False, metadata={"thread_id": "77"}
)
assert result.success is False
assert result.message_id == "202"
assert result.raw_response["partial_overflow"] is True
assert result.raw_response["delivered_chunks"] == 2
assert result.raw_response["last_message_id"] == "202"
assert result.continuation_message_ids == ("202",)
@pytest.mark.asyncio
async def test_edit_overflow_split_reports_partial_failure_when_continuation_fails(telegram_adapter):
"""A failed continuation must not be reported as final delivery."""
content = "word " * 120
telegram_adapter._bot.edit_message_text = AsyncMock(return_value=True)
telegram_adapter._bot.send_message = AsyncMock(
side_effect=[RuntimeError("telegram send failed"), RuntimeError("telegram send failed")]
)
result = await telegram_adapter._edit_overflow_split(
"12345", "201", content, finalize=False, metadata={"thread_id": "77"}
)
assert result.success is False
assert result.retryable is True
assert result.error == "overflow_continuation_failed"
assert result.message_id == "201"
assert result.raw_response["partial_overflow"] is True
assert result.raw_response["delivered_chunks"] == 1
assert result.raw_response["total_chunks"] > 1
assert result.raw_response["last_message_id"] == "201"
assert result.raw_response["delivered_prefix"]
assert result.continuation_message_ids == ()
@pytest.mark.asyncio
async def test_stream_consumer_fallback_sends_tail_after_partial_overflow():
"""A partial overflow edit enters fallback instead of marking final delivered."""
adapter = MagicMock()
adapter.MAX_MESSAGE_LENGTH = 4096
adapter.edit_message = AsyncMock(
return_value=SendResult(
success=False,
message_id="preview-1",
error="overflow_continuation_failed",
retryable=True,
raw_response={
"partial_overflow": True,
"delivered_chunks": 1,
"total_chunks": 2,
"last_message_id": "preview-1",
"delivered_prefix": "hello ",
},
)
)
adapter.send = AsyncMock(return_value=SendResult(success=True, message_id="tail-1"))
adapter.delete_message = AsyncMock(return_value=True)
consumer = GatewayStreamConsumer(adapter, "chat-1", metadata={"thread_id": "77"})
consumer._message_id = "preview-1"
consumer._last_sent_text = "hello "
ok = await consumer._send_or_edit("hello world", finalize=True)
assert ok is False
assert consumer.final_response_sent is False
assert consumer.final_content_delivered is False
assert consumer._fallback_final_send is True
assert consumer._fallback_prefix == "hello "
await consumer._send_fallback_final("hello world")
adapter.send.assert_awaited_once()
assert adapter.send.await_args.kwargs["content"] == "world"
assert adapter.send.await_args.kwargs["metadata"] == {"thread_id": "77"}
adapter.delete_message.assert_not_awaited()
assert consumer.final_response_sent is True
assert consumer.final_content_delivered is True