diff --git a/gateway/config.py b/gateway/config.py index 335b81d8d3..1819665a63 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -195,6 +195,14 @@ class StreamingConfig: edit_interval: float = 1.0 # Seconds between message edits (Telegram rate-limits at ~1/s) buffer_threshold: int = 40 # Chars before forcing an edit cursor: str = " ▉" # Cursor shown during streaming + # Ported from openclaw/openclaw#72038. When >0, the final edit for + # a long-running streamed response is delivered as a fresh message + # if the original preview has been visible for at least this many + # seconds, so the platform's visible timestamp reflects completion + # time instead of the preview creation time. Currently applied to + # Telegram only (other platforms ignore the setting). Default 60s + # matches the OpenClaw rollout. Set to 0 to disable. + fresh_final_after_seconds: float = 60.0 def to_dict(self) -> Dict[str, Any]: return { @@ -203,6 +211,7 @@ class StreamingConfig: "edit_interval": self.edit_interval, "buffer_threshold": self.buffer_threshold, "cursor": self.cursor, + "fresh_final_after_seconds": self.fresh_final_after_seconds, } @classmethod @@ -215,6 +224,9 @@ class StreamingConfig: edit_interval=float(data.get("edit_interval", 1.0)), buffer_threshold=int(data.get("buffer_threshold", 40)), cursor=data.get("cursor", " ▉"), + fresh_final_after_seconds=float( + data.get("fresh_final_after_seconds", 60.0) + ), ) diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index 8cb4f7c0eb..3068318e41 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -1258,6 +1258,27 @@ class BasePlatformAdapter(ABC): """ return SendResult(success=False, error="Not supported") + async def delete_message( + self, + chat_id: str, + message_id: str, + ) -> bool: + """ + Delete a previously sent message. Optional — platforms that don't + support deletion return ``False`` and callers fall back to leaving + the message in place. + + Used by the stream consumer's fresh-final cleanup path (see + openclaw/openclaw#72038) to remove long-lived preview messages + after sending the completed reply as a fresh message so the + platform's visible timestamp reflects completion time. + + Returns ``True`` on successful deletion, ``False`` otherwise. + Subclasses should override for platforms with a deletion API + (e.g. Telegram ``deleteMessage``). + """ + return False + async def send_typing(self, chat_id: str, metadata=None) -> None: """ Send a typing indicator. diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index be1bf494c5..6c7658b308 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -1209,6 +1209,31 @@ class TelegramAdapter(BasePlatformAdapter): ) return SendResult(success=False, error=str(e)) + async def delete_message(self, chat_id: str, message_id: str) -> bool: + """Delete a previously sent Telegram message. + + Used by the stream consumer's fresh-final cleanup path (ported + from openclaw/openclaw#72038) to remove long-lived preview + messages after sending the completed reply as a fresh message. + Telegram's Bot API ``deleteMessage`` works for bot-posted + messages in the last 48 hours. Failures are non-fatal — the + caller leaves the preview in place and logs at debug level. + """ + if not self._bot: + return False + try: + await self._bot.delete_message( + chat_id=int(chat_id), + message_id=int(message_id), + ) + return True + except Exception as e: + logger.debug( + "[%s] Failed to delete Telegram message %s: %s", + self.name, message_id, e, + ) + return False + async def send_update_prompt( self, chat_id: str, prompt: str, default: str = "", session_key: str = "", diff --git a/gateway/run.py b/gateway/run.py index 596edf2edd..5dcdb05f83 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -9154,11 +9154,21 @@ class GatewayRunner: if source.platform == Platform.MATRIX: _effective_cursor = "" _buffer_only = True + # Fresh-final applies to Telegram only — other + # platforms either edit in place cheaply (Discord, + # Slack) or don't have the timestamp-on-edit + # problem. (Ported from openclaw/openclaw#72038.) + _fresh_final_secs = ( + float(getattr(_scfg, "fresh_final_after_seconds", 0.0) or 0.0) + if source.platform == Platform.TELEGRAM + else 0.0 + ) _consumer_cfg = StreamConsumerConfig( edit_interval=_scfg.edit_interval, buffer_threshold=_scfg.buffer_threshold, cursor=_effective_cursor, buffer_only=_buffer_only, + fresh_final_after_seconds=_fresh_final_secs, ) _stream_consumer = GatewayStreamConsumer( adapter=_adapter, @@ -9842,11 +9852,21 @@ class GatewayRunner: if source.platform == Platform.MATRIX: _effective_cursor = "" _buffer_only = True + # Fresh-final applies to Telegram only — other + # platforms either edit in place cheaply or don't + # have the edit-timestamp-stays-stale problem. + # (Ported from openclaw/openclaw#72038.) + _fresh_final_secs = ( + float(getattr(_scfg, "fresh_final_after_seconds", 0.0) or 0.0) + if source.platform == Platform.TELEGRAM + else 0.0 + ) _consumer_cfg = StreamConsumerConfig( edit_interval=_scfg.edit_interval, buffer_threshold=_scfg.buffer_threshold, cursor=_effective_cursor, buffer_only=_buffer_only, + fresh_final_after_seconds=_fresh_final_secs, ) _stream_consumer = GatewayStreamConsumer( adapter=_adapter, diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index 78e365712d..1adbdd3a69 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -44,6 +44,14 @@ class StreamConsumerConfig: buffer_threshold: int = 40 cursor: str = " ▉" buffer_only: bool = False + # When >0, the final edit for a streamed response is delivered as a + # fresh message if the original preview has been visible for at least + # this many seconds. This makes the platform's visible timestamp + # reflect completion time instead of first-token time for long-running + # responses (e.g. reasoning models that stream slowly). Ported from + # openclaw/openclaw#72038. Default 0 = always edit in place (legacy + # behavior). The gateway enables this selectively per-platform. + fresh_final_after_seconds: float = 0.0 class GatewayStreamConsumer: @@ -91,6 +99,12 @@ class GatewayStreamConsumer: self._queue: queue.Queue = queue.Queue() self._accumulated = "" self._message_id: Optional[str] = None + # Wall-clock timestamp (time.monotonic) when ``_message_id`` was + # first assigned from a successful first-send. Used by the + # fresh-final logic to detect long-lived previews whose edit + # timestamps would be stale by completion time. Ported from + # openclaw/openclaw#72038. + self._message_created_ts: Optional[float] = None self._already_sent = False self._edit_supported = True # Disabled when progressive edits are no longer usable self._last_edit_time = 0.0 @@ -136,6 +150,7 @@ class GatewayStreamConsumer: if preserve_no_edit and self._message_id == "__no_edit__": return self._message_id = None + self._message_created_ts = None self._accumulated = "" self._last_sent_text = "" self._fallback_final_send = False @@ -734,6 +749,81 @@ class GatewayStreamConsumer: logger.error("Commentary send error: %s", e) return False + def _should_send_fresh_final(self) -> bool: + """Return True when a long-lived preview should be replaced with a + fresh final message instead of an edit. + + Conditions: + - Fresh-final is enabled (``fresh_final_after_seconds > 0``). + - We have a real preview message id (not the ``__no_edit__`` sentinel + and not ``None``). + - The preview has been visible for at least the configured threshold. + + Ported from openclaw/openclaw#72038. + """ + threshold = getattr(self.cfg, "fresh_final_after_seconds", 0.0) or 0.0 + if threshold <= 0: + return False + if not self._message_id or self._message_id == "__no_edit__": + return False + if self._message_created_ts is None: + return False + age = time.monotonic() - self._message_created_ts + return age >= threshold + + async def _try_fresh_final(self, text: str) -> bool: + """Send ``text`` as a brand-new message (best-effort delete the old + preview) so the platform's visible timestamp reflects completion + time. Returns True on successful delivery, False on any failure so + the caller falls back to the normal edit path. + + Ported from openclaw/openclaw#72038. + """ + old_message_id = self._message_id + try: + result = await self.adapter.send( + chat_id=self.chat_id, + content=text, + metadata=self.metadata, + ) + except Exception as e: + logger.debug("Fresh-final send failed, falling back to edit: %s", e) + return False + if not getattr(result, "success", False): + return False + # Successful fresh send — try to delete the stale preview so the + # user doesn't see the old edit-stuck message underneath. Cleanup + # is best-effort; platforms that don't implement ``delete_message`` + # just leave the preview behind (still an acceptable outcome — + # the visible final timestamp is the important part). + if old_message_id and old_message_id != "__no_edit__": + delete_fn = getattr(self.adapter, "delete_message", None) + if delete_fn is not None: + try: + await delete_fn(self.chat_id, old_message_id) + except Exception as e: + logger.debug( + "Fresh-final preview cleanup failed (%s): %s", + old_message_id, e, + ) + # Adopt the new message id as the current message so subsequent + # callers (e.g. overflow split loops, finalize retries) see a + # consistent state. + new_message_id = getattr(result, "message_id", None) + if new_message_id: + self._message_id = new_message_id + self._message_created_ts = time.monotonic() + else: + # Send succeeded but platform didn't return an id — treat the + # delivery as final-only and fall back to "__no_edit__" so we + # don't try to edit something we can't address. + self._message_id = "__no_edit__" + self._message_created_ts = None + self._already_sent = True + self._last_sent_text = text + self._final_response_sent = True + return True + async def _send_or_edit(self, text: str, *, finalize: bool = False) -> bool: """Send or edit the streaming message. @@ -786,6 +876,22 @@ class GatewayStreamConsumer: finalize and self._adapter_requires_finalize ): return True + # Fresh-final for long-lived previews: when finalizing + # the last edit in a streaming sequence, if the + # original preview has been visible for at least + # ``fresh_final_after_seconds``, send the completed + # reply as a fresh message so the platform's visible + # timestamp reflects completion time instead of the + # preview creation time. Best-effort cleanup of the + # old preview follows. Ported from + # openclaw/openclaw#72038. Gated by config so the + # legacy edit-in-place path stays the default. + if ( + finalize + and self._should_send_fresh_final() + and await self._try_fresh_final(text) + ): + return True # Edit existing message result = await self.adapter.edit_message( chat_id=self.chat_id, @@ -852,6 +958,10 @@ class GatewayStreamConsumer: if result.success: if result.message_id: self._message_id = result.message_id + # Track when the preview first became visible to + # the user so fresh-final logic can detect stale + # preview timestamps on long-running responses. + self._message_created_ts = time.monotonic() else: self._edit_supported = False self._already_sent = True diff --git a/tests/gateway/test_stream_consumer_fresh_final.py b/tests/gateway/test_stream_consumer_fresh_final.py new file mode 100644 index 0000000000..95f55a2117 --- /dev/null +++ b/tests/gateway/test_stream_consumer_fresh_final.py @@ -0,0 +1,236 @@ +"""Regression tests for the fresh-final-for-long-lived-previews path. + +Ported from openclaw/openclaw#72038. When a streamed preview has been +visible long enough that the platform's edit timestamp would be +noticeably stale by completion time, the stream consumer delivers the +final reply as a brand-new message and best-effort deletes the old +preview. This makes Telegram's visible timestamp reflect completion +time instead of first-token time. +""" + +from __future__ import annotations + +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig + + +def _make_adapter(*, supports_delete: bool = True) -> MagicMock: + """Build a minimal MagicMock adapter wired for send/edit/delete.""" + adapter = MagicMock() + adapter.REQUIRES_EDIT_FINALIZE = False + adapter.MAX_MESSAGE_LENGTH = 4096 + adapter.send = AsyncMock(return_value=SimpleNamespace( + success=True, message_id="initial_preview", + )) + adapter.edit_message = AsyncMock(return_value=SimpleNamespace( + success=True, message_id="initial_preview", + )) + if supports_delete: + adapter.delete_message = AsyncMock(return_value=True) + else: + # Adapter without the optional delete_message method — fresh-final + # should still work, it just leaves the stale preview in place. + del adapter.delete_message # type: ignore[attr-defined] + return adapter + + +class TestFreshFinalForLongLivedPreviews: + """openclaw#72038 port — send fresh final when preview is old.""" + + @pytest.mark.asyncio + async def test_disabled_by_default_still_edits_in_place(self): + """``fresh_final_after_seconds=0`` preserves the legacy edit path.""" + adapter = _make_adapter() + consumer = GatewayStreamConsumer( + adapter=adapter, + chat_id="chat", + config=StreamConsumerConfig(fresh_final_after_seconds=0.0), + ) + await consumer._send_or_edit("hello") + # Pretend the preview has been visible for a long time. + consumer._message_created_ts = 0.0 # far in the past + await consumer._send_or_edit("hello world", finalize=True) + # Should edit, not send a fresh message. + assert adapter.send.call_count == 1 # only the initial send + adapter.edit_message.assert_called_once() + + @pytest.mark.asyncio + async def test_short_lived_preview_edits_in_place(self): + """Finalizing a preview younger than the threshold → normal edit.""" + adapter = _make_adapter() + consumer = GatewayStreamConsumer( + adapter=adapter, + chat_id="chat", + config=StreamConsumerConfig(fresh_final_after_seconds=60.0), + ) + await consumer._send_or_edit("hello") + # Preview is "new" — leave _message_created_ts at its real value. + await consumer._send_or_edit("hello world", finalize=True) + assert adapter.send.call_count == 1 + adapter.edit_message.assert_called_once() + + @pytest.mark.asyncio + async def test_long_lived_preview_sends_fresh_final(self): + """Finalizing a preview older than the threshold → fresh send.""" + adapter = _make_adapter() + adapter.send.side_effect = [ + SimpleNamespace(success=True, message_id="initial_preview"), + SimpleNamespace(success=True, message_id="fresh_final"), + ] + consumer = GatewayStreamConsumer( + adapter=adapter, + chat_id="chat", + config=StreamConsumerConfig(fresh_final_after_seconds=60.0), + ) + await consumer._send_or_edit("hello") + # Force the preview to look stale (visible for > 60s). + consumer._message_created_ts = 0.0 # zero = ~uptime seconds old + await consumer._send_or_edit("hello world", finalize=True) + # Fresh send happened; no edit of the old preview. + assert adapter.send.call_count == 2 + adapter.edit_message.assert_not_called() + # The old preview was deleted as cleanup. + adapter.delete_message.assert_awaited_once_with("chat", "initial_preview") + # State was updated to the new message id. + assert consumer._message_id == "fresh_final" + assert consumer._final_response_sent is True + + @pytest.mark.asyncio + async def test_fresh_final_without_delete_support_is_best_effort(self): + """Adapter lacking ``delete_message`` still gets the fresh send.""" + adapter = _make_adapter(supports_delete=False) + adapter.send.side_effect = [ + SimpleNamespace(success=True, message_id="initial_preview"), + SimpleNamespace(success=True, message_id="fresh_final"), + ] + consumer = GatewayStreamConsumer( + adapter=adapter, + chat_id="chat", + config=StreamConsumerConfig(fresh_final_after_seconds=60.0), + ) + await consumer._send_or_edit("hello") + consumer._message_created_ts = 0.0 + await consumer._send_or_edit("hello world", finalize=True) + assert adapter.send.call_count == 2 + adapter.edit_message.assert_not_called() + # No delete attempt — just the fresh send. + assert consumer._message_id == "fresh_final" + + @pytest.mark.asyncio + async def test_fresh_final_fallback_to_edit_on_send_failure(self): + """If the fresh send fails, fall back to the normal edit path.""" + adapter = _make_adapter() + adapter.send.side_effect = [ + SimpleNamespace(success=True, message_id="initial_preview"), + SimpleNamespace(success=False, error="network"), + ] + consumer = GatewayStreamConsumer( + adapter=adapter, + chat_id="chat", + config=StreamConsumerConfig(fresh_final_after_seconds=60.0), + ) + await consumer._send_or_edit("hello") + consumer._message_created_ts = 0.0 + ok = await consumer._send_or_edit("hello world", finalize=True) + # Fresh send was attempted and failed → edit happened instead. + assert adapter.send.call_count == 2 + adapter.edit_message.assert_called_once() + assert ok is True + + @pytest.mark.asyncio + async def test_only_finalize_triggers_fresh_final(self): + """Intermediate edits (``finalize=False``) never switch to fresh send.""" + adapter = _make_adapter() + consumer = GatewayStreamConsumer( + adapter=adapter, + chat_id="chat", + config=StreamConsumerConfig(fresh_final_after_seconds=60.0), + ) + await consumer._send_or_edit("hello") + consumer._message_created_ts = 0.0 # stale + await consumer._send_or_edit("hello partial") # no finalize + assert adapter.send.call_count == 1 + adapter.edit_message.assert_called_once() + + @pytest.mark.asyncio + async def test_no_edit_sentinel_is_not_affected(self): + """Platforms with the ``__no_edit__`` sentinel never go fresh-final.""" + adapter = _make_adapter() + adapter.send.return_value = SimpleNamespace(success=True, message_id=None) + consumer = GatewayStreamConsumer( + adapter=adapter, + chat_id="chat", + config=StreamConsumerConfig(fresh_final_after_seconds=60.0), + ) + await consumer._send_or_edit("hello") + assert consumer._message_id == "__no_edit__" + assert consumer._message_created_ts is None + # Even with finalize=True, no fresh send — the sentinel gates it. + assert consumer._should_send_fresh_final() is False + + +class TestStreamConsumerConfigFreshFinalField: + """The dataclass field must exist and default to 0 (disabled).""" + + def test_default_is_disabled(self): + cfg = StreamConsumerConfig() + assert cfg.fresh_final_after_seconds == 0.0 + + def test_field_is_configurable(self): + cfg = StreamConsumerConfig(fresh_final_after_seconds=120.0) + assert cfg.fresh_final_after_seconds == 120.0 + + +class TestStreamingConfigFreshFinalField: + """The gateway-level StreamingConfig carries the setting.""" + + def test_default_enables_with_60s(self): + from gateway.config import StreamingConfig + cfg = StreamingConfig() + assert cfg.fresh_final_after_seconds == 60.0 + + def test_from_dict_uses_default_when_missing(self): + from gateway.config import StreamingConfig + cfg = StreamingConfig.from_dict({"enabled": True}) + assert cfg.fresh_final_after_seconds == 60.0 + + def test_from_dict_respects_explicit_zero(self): + from gateway.config import StreamingConfig + cfg = StreamingConfig.from_dict({ + "enabled": True, + "fresh_final_after_seconds": 0, + }) + assert cfg.fresh_final_after_seconds == 0.0 + + def test_to_dict_round_trip(self): + from gateway.config import StreamingConfig + original = StreamingConfig(fresh_final_after_seconds=90.0) + restored = StreamingConfig.from_dict(original.to_dict()) + assert restored.fresh_final_after_seconds == 90.0 + + +class TestTelegramAdapterDeleteMessage: + """Contract: Telegram adapter implements ``delete_message``.""" + + def test_delete_message_method_exists(self): + telegram = pytest.importorskip("gateway.platforms.telegram") + import inspect + cls = telegram.TelegramAdapter + assert hasattr(cls, "delete_message"), ( + "TelegramAdapter.delete_message is required for the fresh-final " + "cleanup path (openclaw/openclaw#72038 port)." + ) + sig = inspect.signature(cls.delete_message) + params = list(sig.parameters) + assert params[:3] == ["self", "chat_id", "message_id"] + + def test_base_adapter_default_returns_false(self): + """BasePlatformAdapter.delete_message default = no-op returning False.""" + from gateway.platforms.base import BasePlatformAdapter + import inspect + sig = inspect.signature(BasePlatformAdapter.delete_message) + assert list(sig.parameters)[:3] == ["self", "chat_id", "message_id"] diff --git a/website/docs/user-guide/configuration.md b/website/docs/user-guide/configuration.md index 61eed114e0..d60ad3ecff 100644 --- a/website/docs/user-guide/configuration.md +++ b/website/docs/user-guide/configuration.md @@ -1114,6 +1114,7 @@ streaming: edit_interval: 0.3 # Seconds between message edits buffer_threshold: 40 # Characters before forcing an edit flush cursor: " ▉" # Cursor shown during streaming + fresh_final_after_seconds: 60 # Send fresh final (Telegram) when preview is this old; 0 = always edit in place ``` When enabled, the bot sends a message on the first token, then progressively edits it as more tokens arrive. Platforms that don't support message editing (Signal, Email, Home Assistant) are auto-detected on the first attempt — streaming is gracefully disabled for that session with no flood of messages. @@ -1122,6 +1123,8 @@ For separate natural mid-turn assistant updates without progressive token editin **Overflow handling:** If the streamed text exceeds the platform's message length limit (~4096 chars), the current message is finalized and a new one starts automatically. +**Fresh final (Telegram):** Telegram's `editMessageText` preserves the original message timestamp, so a long-running streamed reply would keep the first-token timestamp even after completion. When `fresh_final_after_seconds > 0` (default `60`), the completed reply is delivered as a brand-new message (with the stale preview best-effort deleted) so Telegram's visible timestamp reflects completion time. Short previews still finalize in place. Set to `0` to always edit in place. + :::note Streaming is disabled by default. Enable it in `~/.hermes/config.yaml` to try the streaming UX. :::