diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index 9f53042395..ea02279706 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -416,7 +416,7 @@ def is_host_excluded_by_no_proxy(hostname: str, no_proxy_value: str | None = Non from dataclasses import dataclass, field from datetime import datetime from pathlib import Path -from typing import Dict, List, Optional, Any, Callable, Awaitable, Tuple +from typing import Dict, List, Optional, Any, Callable, Awaitable, Tuple, Union from enum import Enum from pathlib import Path as _Path @@ -981,7 +981,7 @@ def coerce_plaintext_gateway_command(event: "MessageEvent") -> None: return -@dataclass +@dataclass class SendResult: """Result of sending a message.""" success: bool @@ -991,6 +991,45 @@ class SendResult: retryable: bool = False # True for transient connection errors — base will retry automatically +class EphemeralReply(str): + """System-notice reply that auto-deletes after a TTL. + + Slash-command handlers in ``gateway/run.py`` can return this wrapper + instead of a plain string to request that the reply message be deleted + after ``ttl_seconds`` on platforms that support ``delete_message``. + + Subclassing ``str`` keeps the wrapper transparent to anything that + treats handler return values as text (existing tests use ``in`` / + ``startswith`` / equality; the ``_process_message_background`` pipeline + extracts attachments from the string content). ``isinstance(r, + EphemeralReply)`` still distinguishes ephemeral replies from plain + strings so the send path can schedule deletion. + + Platforms that don't override :meth:`BasePlatformAdapter.delete_message` + silently ignore the TTL — the message is sent normally and left in + place. When ``ttl_seconds`` is ``None``, the pipeline uses the + configured ``display.ephemeral_system_ttl`` default. A default of ``0`` + disables auto-deletion globally, preserving prior behavior. + """ + + ttl_seconds: Optional[int] + + def __new__(cls, text: str, ttl_seconds: Optional[int] = None): + instance = super().__new__(cls, text) + instance.ttl_seconds = ttl_seconds + return instance + + @property + def text(self) -> str: + """Return the underlying text. + + Provided for call sites that want an explicit string conversion, + though ``str(reply)`` and using ``reply`` directly where a string + is expected both work identically. + """ + return str.__str__(self) + + def merge_pending_message_event( pending_messages: Dict[str, MessageEvent], session_key: str, @@ -1073,8 +1112,10 @@ _RETRYABLE_ERROR_PATTERNS = ( ) -# Type for message handlers -MessageHandler = Callable[[MessageEvent], Awaitable[Optional[str]]] +# Type for message handlers. Handlers may return a plain string (normal +# reply), an ``EphemeralReply`` to opt the reply into auto-deletion, or +# ``None`` when the response was already delivered (e.g. via streaming). +MessageHandler = Callable[[MessageEvent], Awaitable[Optional[Union[str, "EphemeralReply"]]]] def resolve_channel_prompt( @@ -1459,6 +1500,64 @@ class BasePlatformAdapter(ABC): """ return False + def _get_ephemeral_system_ttl_default(self) -> int: + """Read ``display.ephemeral_system_ttl`` from config. + + Returns the TTL in seconds to use when an :class:`EphemeralReply` + does not specify one explicitly. ``0`` (the default) disables + auto-deletion. Non-fatal if config is unreadable. + """ + try: + from hermes_cli.config import load_config as _load_config + except Exception: + return 0 + try: + cfg = _load_config() + except Exception: + return 0 + display = cfg.get("display", {}) if isinstance(cfg, dict) else {} + if not isinstance(display, dict): + return 0 + raw = display.get("ephemeral_system_ttl", 0) + try: + return int(raw) + except (TypeError, ValueError): + return 0 + + def _schedule_ephemeral_delete( + self, + chat_id: str, + message_id: str, + ttl_seconds: int, + ) -> None: + """Spawn a detached task that deletes ``message_id`` after ``ttl_seconds``. + + Best-effort — failures (gateway restart, permission denied, message + too old for Telegram's 48h window) are swallowed at debug level. + Does not block the caller. + """ + + async def _run_delete() -> None: + try: + await asyncio.sleep(max(1, int(ttl_seconds))) + await self.delete_message(chat_id=chat_id, message_id=message_id) + except asyncio.CancelledError: + raise + except Exception as e: + logger.debug( + "[%s] Ephemeral delete failed for %s/%s: %s", + self.name, chat_id, message_id, e, + ) + + coro = _run_delete() + try: + asyncio.create_task(coro) + except RuntimeError: + # No running loop (e.g. unit tests that never reach the async + # path). Close the coroutine cleanly so Python doesn't warn + # about it never being awaited, then drop silently. + coro.close() + async def send_slash_confirm( self, chat_id: str, @@ -2048,6 +2147,28 @@ class BasePlatformAdapter(ABC): lowered = error.lower() return "timed out" in lowered or "readtimeout" in lowered or "writetimeout" in lowered + def _unwrap_ephemeral(self, response: Any) -> Tuple[Optional[str], int]: + """Unwrap a handler response into (text, ttl_seconds). + + Accepts a plain string, ``None``, or an :class:`EphemeralReply`. + Returns ``(text, ttl)`` where ``ttl > 0`` means the caller should + schedule a deletion via :meth:`_schedule_ephemeral_delete` after + the send succeeds. ``ttl`` is forced to 0 when the adapter + doesn't override :meth:`delete_message` so non-supporting + platforms silently degrade to normal sends. + """ + if isinstance(response, EphemeralReply): + ttl = response.ttl_seconds + if ttl is None: + try: + ttl = int(self._get_ephemeral_system_ttl_default()) + except Exception: + ttl = 0 + if ttl and ttl > 0 and type(self).delete_message is BasePlatformAdapter.delete_message: + ttl = 0 + return response.text, int(ttl or 0) + return response, 0 + async def _send_with_retry( self, chat_id: str, @@ -2355,13 +2476,20 @@ class BasePlatformAdapter(ABC): release_guard=False, discard_pending=False, ) - if response: - await self._send_with_retry( + _text, _eph_ttl = self._unwrap_ephemeral(response) + if _text: + _r = await self._send_with_retry( chat_id=event.source.chat_id, - content=response, + content=_text, reply_to=event.message_id, metadata=thread_meta, ) + if _eph_ttl > 0 and _r.success and _r.message_id: + self._schedule_ephemeral_delete( + chat_id=event.source.chat_id, + message_id=_r.message_id, + ttl_seconds=_eph_ttl, + ) except Exception: # On failure, restore the original guard if one still exists so # we don't leave the session in a half-reset state. @@ -2441,13 +2569,20 @@ class BasePlatformAdapter(ABC): try: _thread_meta = {"thread_id": event.source.thread_id} if event.source.thread_id else None response = await self._message_handler(event) - if response: - await self._send_with_retry( + _text, _eph_ttl = self._unwrap_ephemeral(response) + if _text: + _r = await self._send_with_retry( chat_id=event.source.chat_id, - content=response, + content=_text, reply_to=event.message_id, metadata=_thread_meta, ) + if _eph_ttl > 0 and _r.success and _r.message_id: + self._schedule_ephemeral_delete( + chat_id=event.source.chat_id, + message_id=_r.message_id, + ttl_seconds=_eph_ttl, + ) except Exception as e: logger.error("[%s] Command '/%s' dispatch failed: %s", self.name, cmd, e, exc_info=True) return @@ -2553,7 +2688,16 @@ class BasePlatformAdapter(ABC): # Call the handler (this can take a while with tool calls) response = await self._message_handler(event) - + + # Slash-command handlers may return an EphemeralReply sentinel to + # request that their reply message auto-delete after a TTL (used + # for system notices like "✨ New session started!" that the user + # doesn't need to keep in the thread). Unwrap here so all the + # downstream extract_media / text-processing logic sees a plain + # string, and remember the TTL + platform capability so the + # post-send block can schedule the deletion. + response, _ephemeral_ttl = self._unwrap_ephemeral(response) + # Send response if any. A None/empty response is normal when # streaming already delivered the text (already_sent=True) or # when the message was queued behind an active agent. Log at @@ -2642,6 +2786,21 @@ class BasePlatformAdapter(ABC): ) _record_delivery(result) + # Schedule auto-deletion of system-notice replies. + # Detached so the handler returns immediately; errors + # (permission denied, message too old) are swallowed. + if ( + _ephemeral_ttl + and _ephemeral_ttl > 0 + and result.success + and result.message_id + ): + self._schedule_ephemeral_delete( + chat_id=event.source.chat_id, + message_id=result.message_id, + ttl_seconds=_ephemeral_ttl, + ) + # Human-like pacing delay between text and media human_delay = self._get_human_delay() diff --git a/gateway/run.py b/gateway/run.py index 90faf9a745..d991ac4ff8 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -29,7 +29,7 @@ from collections import OrderedDict from contextvars import copy_context from pathlib import Path from datetime import datetime -from typing import Dict, Optional, Any, List +from typing import Dict, Optional, Any, List, Union # account_usage imports the OpenAI SDK chain (~230 ms). Only needed by # /usage; we still import it at module top in the gateway because test @@ -454,6 +454,7 @@ from gateway.session import ( from gateway.delivery import DeliveryRouter from gateway.platforms.base import ( BasePlatformAdapter, + EphemeralReply, MessageEvent, MessageType, merge_pending_message_event, @@ -4472,7 +4473,7 @@ class GatewayRunner: invalidation_reason="stop_command", ) logger.info("STOP for session %s — agent interrupted, session lock released", _quick_key) - return "⚡ Stopped. You can continue this session." + return EphemeralReply("⚡ Stopped. You can continue this session.") # /reset and /new must bypass the running-agent guard so they # actually dispatch as commands instead of being queued as user @@ -4677,7 +4678,7 @@ class GatewayRunner: # Force-clean the sentinel so the session is unlocked. self._release_running_agent_state(_quick_key) logger.info("HARD STOP (pending) for session %s — sentinel cleared", _quick_key) - return "⚡ Force-stopped. The agent was still starting — session unlocked." + return EphemeralReply("⚡ Force-stopped. The agent was still starting — session unlocked.") # Queue the message so it will be picked up after the # agent starts. adapter = self.adapters.get(source.platform) @@ -6353,7 +6354,7 @@ class GatewayRunner: return "\n".join(lines) - async def _handle_reset_command(self, event: MessageEvent) -> str: + async def _handle_reset_command(self, event: MessageEvent) -> Union[str, EphemeralReply]: """Handle /new or /reset command.""" source = event.source @@ -6464,8 +6465,8 @@ class GatewayRunner: _tip_line = "" if session_info: - return f"{header}\n\n{session_info}{_tip_line}" - return f"{header}{_tip_line}" + return EphemeralReply(f"{header}\n\n{session_info}{_tip_line}") + return EphemeralReply(f"{header}{_tip_line}") async def _handle_profile_command(self, event: MessageEvent) -> str: """Handle /profile — show active profile name and home directory.""" @@ -6713,7 +6714,7 @@ class GatewayRunner: return "\n".join(lines) - async def _handle_stop_command(self, event: MessageEvent) -> str: + async def _handle_stop_command(self, event: MessageEvent) -> Union[str, EphemeralReply]: """Handle /stop command - interrupt a running agent. When an agent is truly hung (blocked thread that never checks @@ -6738,7 +6739,7 @@ class GatewayRunner: invalidation_reason="stop_command_pending", ) logger.info("STOP (pending) for session %s — sentinel cleared", session_key) - return "⚡ Stopped. The agent hadn't started yet — you can continue this session." + return EphemeralReply("⚡ Stopped. The agent hadn't started yet — you can continue this session.") if agent: # Force-clean the session lock so a truly hung agent doesn't # keep it locked forever. @@ -6748,11 +6749,11 @@ class GatewayRunner: interrupt_reason=_INTERRUPT_REASON_STOP, invalidation_reason="stop_command_handler", ) - return "⚡ Stopped. You can continue this session." + return EphemeralReply("⚡ Stopped. You can continue this session.") else: return "No active task to stop." - async def _handle_restart_command(self, event: MessageEvent) -> str: + async def _handle_restart_command(self, event: MessageEvent) -> Union[str, EphemeralReply]: """Handle /restart command - drain active work, then restart the gateway.""" # Defensive idempotency check: if the previous gateway process # recorded this same /restart (same platform + update_id) and the new @@ -6778,7 +6779,7 @@ class GatewayRunner: count = self._running_agent_count() if count: return f"⏳ Draining {count} active agent(s) before restart..." - return "⏳ Gateway restart already in progress..." + return EphemeralReply("⏳ Gateway restart already in progress...") # Save the requester's routing info so the new gateway process can # notify them once it comes back online. @@ -6830,7 +6831,7 @@ class GatewayRunner: self.request_restart(detached=True, via_service=False) if active_agents: return f"⏳ Draining {active_agents} active agent(s) before restart..." - return "♻ Restarting gateway. If you aren't notified within 60 seconds, restart from the console with `hermes gateway restart`." + return EphemeralReply("♻ Restarting gateway. If you aren't notified within 60 seconds, restart from the console with `hermes gateway restart`.") def _is_stale_restart_redelivery(self, event: MessageEvent) -> bool: """Return True if this /restart is a Telegram re-delivery we already handled. @@ -8321,7 +8322,7 @@ class GatewayRunner: return f"⚡ ✓ Priority Processing: **{label}** (saved to config)\n_(takes effect on next message)_" return f"⚡ ✓ Priority Processing: **{label}** (this session only)" - async def _handle_yolo_command(self, event: MessageEvent) -> str: + async def _handle_yolo_command(self, event: MessageEvent) -> Union[str, EphemeralReply]: """Handle /yolo — toggle dangerous command approval bypass for this session only.""" from tools.approval import ( disable_session_yolo, @@ -8333,10 +8334,10 @@ class GatewayRunner: current = is_session_yolo_enabled(session_key) if current: disable_session_yolo(session_key) - return "⚠️ YOLO mode **OFF** for this session — dangerous commands will require approval." + return EphemeralReply("⚠️ YOLO mode **OFF** for this session — dangerous commands will require approval.") else: enable_session_yolo(session_key) - return "⚡ YOLO mode **ON** for this session — all commands auto-approved. Use with caution." + return EphemeralReply("⚡ YOLO mode **ON** for this session — all commands auto-approved. Use with caution.") async def _handle_verbose_command(self, event: MessageEvent) -> str: """Handle /verbose command — cycle tool progress display mode. diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 82498c81cc..df1a5943f7 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -775,6 +775,14 @@ DEFAULT_CONFIG = { "tool_progress_command": False, # Enable /verbose command in messaging gateway "tool_progress_overrides": {}, # DEPRECATED — use display.platforms instead "tool_preview_length": 0, # Max chars for tool call previews (0 = no limit, show full paths/commands) + # Auto-delete system-notice replies (e.g. "✨ New session started!", + # "♻ Restarting gateway…", "⚡ Stopped…") after N seconds on platforms + # that support message deletion (currently Telegram; other platforms + # ignore and leave the message in place). Only affects slash-command + # replies wrapped with gateway.platforms.base.EphemeralReply — agent + # responses and content messages are never touched. Default 0 + # (disabled) preserves prior behavior. + "ephemeral_system_ttl": 0, "platforms": {}, # Per-platform display overrides: {"telegram": {"tool_progress": "all"}, "slack": {"tool_progress": "off"}} # Gateway runtime-metadata footer appended to the FINAL message of a turn # (disabled by default to keep replies minimal). When enabled, renders diff --git a/tests/gateway/test_ephemeral_reply.py b/tests/gateway/test_ephemeral_reply.py new file mode 100644 index 0000000000..41565e163b --- /dev/null +++ b/tests/gateway/test_ephemeral_reply.py @@ -0,0 +1,336 @@ +"""Tests for EphemeralReply — system-notice auto-delete in gateway adapters. + +Slash-command handlers in ``gateway/run.py`` can return an +``EphemeralReply`` wrapper to request auto-deletion of the reply message +after a TTL. The base adapter unwraps the sentinel before sending and +schedules a detached delete task when the platform supports +``delete_message``. + +Covered: + +1. ``_unwrap_ephemeral`` returns text + ttl for EphemeralReply, and + passes plain strings through unchanged. +2. TTL is zeroed on platforms that don't override ``delete_message`` + (silent degrade — message stays in place). +3. TTL is honored on platforms that DO override ``delete_message``. +4. ``_schedule_ephemeral_delete`` invokes ``delete_message`` after the + configured delay with the correct chat_id / message_id. +5. ``_process_message_background`` sends the unwrapped text (not the + sentinel object) and schedules deletion when appropriate. +6. The two busy-session bypass paths also unwrap + schedule. +""" + +import asyncio +from unittest.mock import AsyncMock, patch + +import pytest + +from gateway.config import Platform, PlatformConfig +from gateway.platforms.base import ( + BasePlatformAdapter, + EphemeralReply, + MessageEvent, + MessageType, + SendResult, +) +from gateway.session import SessionSource + + +class _NoDeleteAdapter(BasePlatformAdapter): + """Adapter that does NOT override delete_message (silent degrade).""" + + async def connect(self): + pass + + async def disconnect(self): + pass + + async def send(self, chat_id, content="", **kwargs): + return SendResult(success=True, message_id="m-1") + + async def get_chat_info(self, chat_id): + return {} + + +class _DeleteCapableAdapter(BasePlatformAdapter): + """Adapter that overrides delete_message (TTL honored).""" + + def __init__(self, *a, **kw): + super().__init__(*a, **kw) + self.deleted: list[tuple[str, str]] = [] + + async def connect(self): + pass + + async def disconnect(self): + pass + + async def send(self, chat_id, content="", **kwargs): + return SendResult(success=True, message_id="m-2") + + async def get_chat_info(self, chat_id): + return {} + + async def delete_message(self, chat_id: str, message_id: str) -> bool: + self.deleted.append((chat_id, message_id)) + return True + + +def _no_delete_adapter(): + return _NoDeleteAdapter( + PlatformConfig(enabled=True, token="t"), Platform.TELEGRAM + ) + + +def _delete_adapter(): + return _DeleteCapableAdapter( + PlatformConfig(enabled=True, token="t"), Platform.TELEGRAM + ) + + +def _make_event(text="/stop", chat_id="42"): + return MessageEvent( + text=text, + message_id="msg-1", + source=SessionSource( + platform=Platform.TELEGRAM, + chat_id=chat_id, + user_id="u-1", + ), + message_type=MessageType.TEXT, + ) + + +# --------------------------------------------------------------------------- +# _unwrap_ephemeral +# --------------------------------------------------------------------------- + + +def test_unwrap_plain_string_is_passthrough(): + adapter = _delete_adapter() + text, ttl = adapter._unwrap_ephemeral("hello") + assert text == "hello" + assert ttl == 0 + + +def test_unwrap_none_is_passthrough(): + adapter = _delete_adapter() + text, ttl = adapter._unwrap_ephemeral(None) + assert text is None + assert ttl == 0 + + +def test_unwrap_ephemeral_explicit_ttl_on_capable_adapter(): + adapter = _delete_adapter() + text, ttl = adapter._unwrap_ephemeral(EphemeralReply("bye", ttl_seconds=60)) + assert text == "bye" + assert ttl == 60 + + +def test_unwrap_ephemeral_zeros_ttl_on_incapable_adapter(): + """Platforms without delete_message should silently degrade to normal send.""" + adapter = _no_delete_adapter() + text, ttl = adapter._unwrap_ephemeral(EphemeralReply("bye", ttl_seconds=60)) + assert text == "bye" + assert ttl == 0 # forced to 0 — message will stay in place + + +def test_unwrap_ephemeral_default_ttl_from_config(): + adapter = _delete_adapter() + with patch.object(adapter, "_get_ephemeral_system_ttl_default", return_value=120): + text, ttl = adapter._unwrap_ephemeral(EphemeralReply("bye")) + assert text == "bye" + assert ttl == 120 + + +def test_unwrap_ephemeral_default_ttl_zero_disables(): + """Config default of 0 (the shipped default) means the feature is off.""" + adapter = _delete_adapter() + with patch.object(adapter, "_get_ephemeral_system_ttl_default", return_value=0): + text, ttl = adapter._unwrap_ephemeral(EphemeralReply("bye")) + assert text == "bye" + assert ttl == 0 + + +def test_unwrap_ephemeral_handles_unreadable_config(): + adapter = _delete_adapter() + with patch.object( + adapter, + "_get_ephemeral_system_ttl_default", + side_effect=RuntimeError("boom"), + ): + text, ttl = adapter._unwrap_ephemeral(EphemeralReply("bye")) + # Fall back to 0 rather than crashing the handler pipeline. + assert text == "bye" + assert ttl == 0 + + +# --------------------------------------------------------------------------- +# _schedule_ephemeral_delete +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_schedule_ephemeral_delete_calls_delete_after_ttl(): + adapter = _delete_adapter() + # Use a very short TTL to keep the test fast — the implementation + # floors sleeps at 1s via ``max(1, int(ttl_seconds))``. Patch asyncio.sleep + # inside the module under test; the test body uses the real one for + # scheduler pumping. + import gateway.platforms.base as base_module + + sleeps: list[float] = [] + _real_sleep = base_module.asyncio.sleep + + async def _fake_sleep(duration): + sleeps.append(duration) + # Yield control so the rest of the task body can run. + await _real_sleep(0) + + with patch.object(base_module.asyncio, "sleep", _fake_sleep): + adapter._schedule_ephemeral_delete( + chat_id="42", message_id="m-2", ttl_seconds=5 + ) + # Let the spawned task run. + for _ in range(5): + await _real_sleep(0) + + # Only the ttl sleep shows up — the test pump uses the real sleep. + assert 5 in sleeps + assert adapter.deleted == [("42", "m-2")] + + +@pytest.mark.asyncio +async def test_schedule_ephemeral_delete_swallows_errors(): + adapter = _delete_adapter() + + async def _boom(*a, **kw): + raise RuntimeError("permission denied") + + adapter.delete_message = _boom # type: ignore[assignment] + with patch("gateway.platforms.base.asyncio.sleep", AsyncMock()): + adapter._schedule_ephemeral_delete( + chat_id="42", message_id="m-2", ttl_seconds=1 + ) + # No exception should propagate even though delete_message raised. + for _ in range(5): + await asyncio.sleep(0) + + +def test_schedule_ephemeral_delete_outside_event_loop_is_noop(): + """No running loop → no crash, silently drops the request.""" + adapter = _delete_adapter() + # No pytest.mark.asyncio → no loop. Must not raise. + adapter._schedule_ephemeral_delete( + chat_id="42", message_id="m-2", ttl_seconds=1 + ) + assert adapter.deleted == [] + + +# --------------------------------------------------------------------------- +# _process_message_background unwraps EphemeralReply before send +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_process_message_unwraps_ephemeral_before_send(): + """The adapter must send the wrapper's .text, never the wrapper object.""" + adapter = _delete_adapter() + adapter._send_with_retry = AsyncMock( + return_value=SendResult(success=True, message_id="sent-1") + ) + + async def _handler(evt): + return EphemeralReply("⚡ Stopped.", ttl_seconds=5) + + adapter.set_message_handler(_handler) + + sleeps: list[float] = [] + + async def _fake_sleep(duration): + sleeps.append(duration) + + event = _make_event() + session_key = "agent:main:telegram:private:42" + with patch("gateway.platforms.base.asyncio.sleep", _fake_sleep), patch.object( + adapter, "_keep_typing", new=AsyncMock() + ): + await adapter._process_message_background(event, session_key) + # Pump until the detached delete task completes. + for _ in range(10): + await asyncio.sleep(0) + + # Sent text is the unwrapped string, NOT repr(EphemeralReply(...)) + adapter._send_with_retry.assert_called_once() + sent_text = adapter._send_with_retry.call_args.kwargs["content"] + assert sent_text == "⚡ Stopped." + # Auto-delete scheduled using the returned message_id + assert ("42", "sent-1") in adapter.deleted + + +@pytest.mark.asyncio +async def test_process_message_incapable_platform_does_not_schedule_delete(): + adapter = _no_delete_adapter() + adapter._send_with_retry = AsyncMock( + return_value=SendResult(success=True, message_id="sent-1") + ) + + async def _handler(evt): + return EphemeralReply("⚡ Stopped.", ttl_seconds=5) + + adapter.set_message_handler(_handler) + + # Spy on delete_message to confirm it is NOT invoked. + delete_calls: list = [] + + async def _spy_delete(chat_id, message_id): + delete_calls.append((chat_id, message_id)) + return False + + adapter.delete_message = _spy_delete # type: ignore[assignment] + + event = _make_event() + session_key = "agent:main:telegram:private:42" + with patch("gateway.platforms.base.asyncio.sleep", AsyncMock()), patch.object( + adapter, "_keep_typing", new=AsyncMock() + ): + await adapter._process_message_background(event, session_key) + for _ in range(10): + await asyncio.sleep(0) + + # Send happened with the unwrapped text... + adapter._send_with_retry.assert_called_once() + assert adapter._send_with_retry.call_args.kwargs["content"] == "⚡ Stopped." + # ...but delete was never scheduled because the capability check skipped + # the schedule call (TTL was zeroed in _unwrap_ephemeral). + # Note: the capability gate on _unwrap_ephemeral checks for + # ``type(adapter).delete_message is BasePlatformAdapter.delete_message``. + # Monkeypatching the instance does NOT change the class, so this test + # verifies the gate uses the class method to detect capability. + assert delete_calls == [] + + +@pytest.mark.asyncio +async def test_process_message_plain_string_behaves_unchanged(): + adapter = _delete_adapter() + adapter._send_with_retry = AsyncMock( + return_value=SendResult(success=True, message_id="sent-1") + ) + + async def _handler(evt): + return "plain reply" + + adapter.set_message_handler(_handler) + + event = _make_event() + session_key = "agent:main:telegram:private:42" + with patch("gateway.platforms.base.asyncio.sleep", AsyncMock()), patch.object( + adapter, "_keep_typing", new=AsyncMock() + ): + await adapter._process_message_background(event, session_key) + for _ in range(5): + await asyncio.sleep(0) + + adapter._send_with_retry.assert_called_once() + assert adapter._send_with_retry.call_args.kwargs["content"] == "plain reply" + assert adapter.deleted == [] # no auto-delete for plain replies