mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-29 06:31:32 +00:00
feat(telegram): edit status messages in place instead of appending (#30864)
Closes #30045. Based on @qike-ms's PR #30141. Telegram status callbacks (lifecycle, compression, context-pressure) used to append a fresh bubble on every emit. Now adapter tracks {(chat_id, status_key) -> message_id}; first call sends, subsequent calls edit. Failed edits drop the cache entry and fall through to a fresh send. - gateway/platforms/telegram.py: send_or_update_status() (+34 LOC) - gateway/run.py: route _status_callback_sync through it when the adapter supports it; plain adapter.send() otherwise (+15 LOC) - 5 tests covering first send / edit-in-place / edit-failure fallback / distinct key & chat isolation
This commit is contained in:
parent
4b6d68bd64
commit
9acf949e34
3 changed files with 214 additions and 5 deletions
|
|
@ -468,6 +468,10 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
# "all" — every message triggers a push notification (legacy
|
||||
# behavior; opt-in via display.platforms.telegram.notifications).
|
||||
self._notifications_mode: str = "important"
|
||||
# send_or_update_status() bookkeeping: {(chat_id, status_key) -> bot message_id}
|
||||
# Tracks status bubbles owned by this adapter so subsequent calls with the
|
||||
# same key edit the same message instead of appending new ones (#30045).
|
||||
self._status_message_ids: Dict[tuple, str] = {}
|
||||
|
||||
def _notification_kwargs(
|
||||
self, metadata: Optional[Dict[str, Any]]
|
||||
|
|
@ -1908,6 +1912,40 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
is_connect_timeout = self._looks_like_connect_timeout(e)
|
||||
return SendResult(success=False, error=str(e), retryable=(is_connect_timeout or not is_timeout))
|
||||
|
||||
async def send_or_update_status(
|
||||
self,
|
||||
chat_id: str,
|
||||
status_key: str,
|
||||
content: str,
|
||||
*,
|
||||
metadata: Optional[Dict[str, Any]] = None,
|
||||
) -> SendResult:
|
||||
"""Send a status message, or edit the previous one with the same key.
|
||||
|
||||
Issue #30045: progress/status callbacks (context-pressure, lifecycle,
|
||||
compression, etc.) used to append a fresh bubble on every call. With
|
||||
this method, the first call sends and the message id is remembered;
|
||||
subsequent calls with the same (chat_id, status_key) edit that same
|
||||
message in place. If the edit fails (message deleted, too old, etc.)
|
||||
we drop the cached id and send fresh.
|
||||
"""
|
||||
key = (str(chat_id), str(status_key))
|
||||
cached_id = self._status_message_ids.get(key)
|
||||
if cached_id is not None:
|
||||
result = await self.edit_message(
|
||||
chat_id, cached_id, content, finalize=True, metadata=metadata,
|
||||
)
|
||||
if result.success:
|
||||
if result.message_id:
|
||||
self._status_message_ids[key] = str(result.message_id)
|
||||
return result
|
||||
# Edit failed — clear the cached id and fall through to a fresh send.
|
||||
self._status_message_ids.pop(key, None)
|
||||
result = await self.send(chat_id, content, metadata=metadata)
|
||||
if result.success and result.message_id:
|
||||
self._status_message_ids[key] = str(result.message_id)
|
||||
return result
|
||||
|
||||
async def edit_message(
|
||||
self,
|
||||
chat_id: str,
|
||||
|
|
|
|||
|
|
@ -238,6 +238,19 @@ def _prepare_gateway_status_message(platform: Any, event_type: str, message: str
|
|||
return text
|
||||
|
||||
|
||||
async def _send_or_update_status_coro(adapter, chat_id, status_key, content, metadata):
|
||||
"""Route a status message through adapter.send_or_update_status when supported.
|
||||
|
||||
Issue #30045: adapters that implement send_or_update_status (currently
|
||||
Telegram) edit the previous bubble for the same status_key instead of
|
||||
appending a new one. Adapters without the method fall back to plain send.
|
||||
"""
|
||||
sender = getattr(adapter, "send_or_update_status", None)
|
||||
if callable(sender):
|
||||
return await sender(chat_id, status_key, content, metadata=metadata)
|
||||
return await adapter.send(chat_id, content, metadata=metadata)
|
||||
|
||||
|
||||
def _telegramize_command_mentions(text: str, platform: Any) -> str:
|
||||
"""Rewrite slash-command mentions to Telegram-valid command names.
|
||||
|
||||
|
|
@ -16141,11 +16154,7 @@ class GatewayRunner:
|
|||
)
|
||||
return
|
||||
_fut = safe_schedule_threadsafe(
|
||||
_status_adapter.send(
|
||||
_status_chat_id,
|
||||
prepared_message,
|
||||
metadata=_status_thread_metadata,
|
||||
),
|
||||
_send_or_update_status_coro(_status_adapter, _status_chat_id, event_type, prepared_message, _status_thread_metadata),
|
||||
_loop_for_step,
|
||||
logger=logger,
|
||||
log_message=f"status_callback ({event_type}) scheduling error",
|
||||
|
|
|
|||
162
tests/gateway/test_telegram_status_update.py
Normal file
162
tests/gateway/test_telegram_status_update.py
Normal file
|
|
@ -0,0 +1,162 @@
|
|||
"""Tests for TelegramAdapter.send_or_update_status (issue #30045).
|
||||
|
||||
The status-update path must:
|
||||
1. Send a fresh message on the first call for a (chat_id, status_key) pair.
|
||||
2. Edit that same message on subsequent calls with the same key.
|
||||
3. Fall back to sending fresh when the cached message edit fails.
|
||||
4. Keep distinct keys independent (no cross-talk).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import types
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import PlatformConfig
|
||||
from gateway.platforms.base import SendResult
|
||||
|
||||
|
||||
def _install_fake_telegram(monkeypatch):
|
||||
"""Stub the python-telegram-bot package so TelegramAdapter can be imported."""
|
||||
fake_telegram = types.ModuleType("telegram")
|
||||
fake_telegram.Update = SimpleNamespace(ALL_TYPES=())
|
||||
fake_telegram.Bot = object
|
||||
fake_telegram.Message = object
|
||||
fake_telegram.InlineKeyboardButton = object
|
||||
fake_telegram.InlineKeyboardMarkup = object
|
||||
|
||||
fake_error = types.ModuleType("telegram.error")
|
||||
fake_error.NetworkError = type("NetworkError", (Exception,), {})
|
||||
fake_error.BadRequest = type("BadRequest", (Exception,), {})
|
||||
fake_error.TimedOut = type("TimedOut", (Exception,), {})
|
||||
fake_telegram.error = fake_error
|
||||
|
||||
fake_constants = types.ModuleType("telegram.constants")
|
||||
fake_constants.ParseMode = SimpleNamespace(MARKDOWN_V2="MarkdownV2")
|
||||
fake_constants.ChatType = SimpleNamespace(
|
||||
GROUP="group", SUPERGROUP="supergroup",
|
||||
CHANNEL="channel", PRIVATE="private",
|
||||
)
|
||||
fake_telegram.constants = fake_constants
|
||||
|
||||
fake_ext = types.ModuleType("telegram.ext")
|
||||
fake_ext.Application = object
|
||||
fake_ext.CommandHandler = object
|
||||
fake_ext.CallbackQueryHandler = object
|
||||
fake_ext.MessageHandler = object
|
||||
fake_ext.ContextTypes = SimpleNamespace(DEFAULT_TYPE=object)
|
||||
fake_ext.filters = object
|
||||
|
||||
fake_request = types.ModuleType("telegram.request")
|
||||
fake_request.HTTPXRequest = object
|
||||
|
||||
monkeypatch.setitem(sys.modules, "telegram", fake_telegram)
|
||||
monkeypatch.setitem(sys.modules, "telegram.error", fake_error)
|
||||
monkeypatch.setitem(sys.modules, "telegram.constants", fake_constants)
|
||||
monkeypatch.setitem(sys.modules, "telegram.ext", fake_ext)
|
||||
monkeypatch.setitem(sys.modules, "telegram.request", fake_request)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def adapter(monkeypatch):
|
||||
_install_fake_telegram(monkeypatch)
|
||||
from gateway.platforms.telegram import TelegramAdapter
|
||||
|
||||
a = TelegramAdapter(PlatformConfig(enabled=True, token="fake-token"))
|
||||
a._bot = MagicMock()
|
||||
# Patch send / edit_message so tests can drive them directly.
|
||||
a.send = AsyncMock()
|
||||
a.edit_message = AsyncMock()
|
||||
return a
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_first_call_sends_and_caches_message_id(adapter):
|
||||
"""First call for a (chat, key) pair must send and remember the id."""
|
||||
adapter.send.return_value = SendResult(success=True, message_id="100")
|
||||
|
||||
result = await adapter.send_or_update_status("chat-1", "lifecycle", "starting")
|
||||
|
||||
assert result.success is True
|
||||
assert result.message_id == "100"
|
||||
adapter.send.assert_awaited_once()
|
||||
adapter.edit_message.assert_not_awaited()
|
||||
assert adapter._status_message_ids[("chat-1", "lifecycle")] == "100"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_second_call_edits_in_place(adapter):
|
||||
"""Same (chat, key) on the second call must edit, not send."""
|
||||
adapter.send.return_value = SendResult(success=True, message_id="100")
|
||||
adapter.edit_message.return_value = SendResult(success=True, message_id="100")
|
||||
|
||||
await adapter.send_or_update_status("chat-1", "lifecycle", "step 1")
|
||||
await adapter.send_or_update_status("chat-1", "lifecycle", "step 2")
|
||||
|
||||
adapter.send.assert_awaited_once()
|
||||
adapter.edit_message.assert_awaited_once()
|
||||
# Edit was directed at the cached message id.
|
||||
args, kwargs = adapter.edit_message.call_args
|
||||
assert args[0] == "chat-1"
|
||||
assert args[1] == "100"
|
||||
assert args[2] == "step 2"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_edit_failure_falls_back_to_fresh_send(adapter):
|
||||
"""When edit_message fails the cache is cleared and a new send happens."""
|
||||
adapter.send.side_effect = [
|
||||
SendResult(success=True, message_id="100"),
|
||||
SendResult(success=True, message_id="200"),
|
||||
]
|
||||
adapter.edit_message.return_value = SendResult(
|
||||
success=False, error="Bad Request: message to edit not found",
|
||||
)
|
||||
|
||||
await adapter.send_or_update_status("chat-1", "lifecycle", "step 1")
|
||||
result = await adapter.send_or_update_status("chat-1", "lifecycle", "step 2")
|
||||
|
||||
assert result.success is True
|
||||
assert result.message_id == "200"
|
||||
assert adapter.send.await_count == 2
|
||||
assert adapter.edit_message.await_count == 1
|
||||
# Cache now points at the fresh message id.
|
||||
assert adapter._status_message_ids[("chat-1", "lifecycle")] == "200"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_distinct_status_keys_do_not_collide(adapter):
|
||||
"""A different status_key gets its own message; the original isn't touched."""
|
||||
adapter.send.side_effect = [
|
||||
SendResult(success=True, message_id="100"),
|
||||
SendResult(success=True, message_id="200"),
|
||||
]
|
||||
|
||||
await adapter.send_or_update_status("chat-1", "lifecycle", "ctx pressure")
|
||||
await adapter.send_or_update_status("chat-1", "model-switch", "switched to opus")
|
||||
|
||||
assert adapter.send.await_count == 2
|
||||
adapter.edit_message.assert_not_awaited()
|
||||
assert adapter._status_message_ids[("chat-1", "lifecycle")] == "100"
|
||||
assert adapter._status_message_ids[("chat-1", "model-switch")] == "200"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_distinct_chat_ids_do_not_collide(adapter):
|
||||
"""Same status_key in different chats must not edit each other's messages."""
|
||||
adapter.send.side_effect = [
|
||||
SendResult(success=True, message_id="100"),
|
||||
SendResult(success=True, message_id="200"),
|
||||
]
|
||||
|
||||
await adapter.send_or_update_status("chat-1", "lifecycle", "first")
|
||||
await adapter.send_or_update_status("chat-2", "lifecycle", "second")
|
||||
|
||||
assert adapter.send.await_count == 2
|
||||
adapter.edit_message.assert_not_awaited()
|
||||
assert adapter._status_message_ids[("chat-1", "lifecycle")] == "100"
|
||||
assert adapter._status_message_ids[("chat-2", "lifecycle")] == "200"
|
||||
Loading…
Add table
Add a link
Reference in a new issue