hermes-agent/tests/gateway/test_text_batching.py
kshitijk4poor cc8e5ec2af refactor(gateway): migrate Discord adapter to bundled plugin (full Teams parity)
First migration of an existing built-in platform adapter to the plugin
system established by IRC / Teams / LINE / Google Chat. Closes #24325;
advances the umbrella refactor in #3823.

Matches Teams' shape exactly — adapter under ``plugins/platforms/discord/``
with the standard ``__init__.py`` / ``adapter.py`` / ``plugin.yaml``
shell, ``register(ctx)`` entry point, **no back-compat shim** at the old
import path, and full parity for the four hooks Teams uses plus the
``apply_yaml_config_fn`` hook that landed in #25443 (the Discord plugin
is the first consumer of that hook):

* ``standalone_sender_fn`` — out-of-process cron delivery via REST API
* ``setup_fn`` — interactive ``hermes setup gateway`` wizard
* ``apply_yaml_config_fn`` — translate ``config.yaml`` ``discord:`` keys
  into ``DISCORD_*`` env vars (replaces the hardcoded block in
  ``gateway/config.py``)
* ``is_connected`` — declares connection state from ``DISCORD_BOT_TOKEN``
* ``check_fn`` — lazy-installs ``discord.py`` on demand
* plus ``allowed_users_env``, ``allow_all_env``, ``cron_deliver_env_var``,
  ``max_message_length``, ``emoji``, ``required_env``, ``install_hint``

* ``gateway/platforms/discord.py`` (5,101 LOC) →
  ``plugins/platforms/discord/adapter.py`` (git rename, R090).
* New ``plugins/platforms/discord/{__init__.py, plugin.yaml}`` with
  ``requires_env`` / ``optional_env`` declarations.
* Append ``register(ctx)`` block + new hook implementations
  (``_standalone_send``, ``interactive_setup``, ``_apply_yaml_config``,
  ``_clean_discord_user_ids``, ``_is_connected``, ``_build_adapter``,
  plus helpers ``_DISCORD_CHANNEL_TYPE_PROBE_CACHE`` etc.) to the
  adapter.

* Replace the ``Platform.DISCORD elif`` branch in
  ``GatewayRunner._create_adapter()`` (−9 LOC) with a generic post-creation
  hook (+6 LOC) in the registry path: any plugin adapter that declares a
  ``gateway_runner`` attribute now gets it auto-injected. Webhook's
  built-in branch is unchanged (it doesn't go through the registry path).

* Move ``_send_discord`` (190 LOC) and helpers
  (``_DISCORD_CHANNEL_TYPE_PROBE_CACHE``, ``_remember_channel_is_forum``,
  ``_probe_is_forum_cached``, ``_derive_forum_thread_name``) from
  ``tools/send_message_tool.py`` into the plugin as ``_standalone_send``.
* Wire via ``standalone_sender_fn=_standalone_send`` (Teams pattern; same
  gap fixed in #21804 for other plugin platforms).
* Replace the Discord ``elif`` in ``tools/send_message_tool.py``
  ``_send_to_platform`` with a 10-line registry-hook dispatch.
* Drop the ``DiscordAdapter`` import and the
  ``Platform.DISCORD: DiscordAdapter.MAX_MESSAGE_LENGTH`` ``_MAX_LENGTHS``
  entry — the registry's ``max_message_length=2000`` covers it.

* Move ``_setup_discord`` and ``_clean_discord_user_ids`` (68 LOC) from
  ``hermes_cli/setup.py`` into the plugin as ``interactive_setup``.
* Wire via ``setup_fn=interactive_setup``.  CLI helpers (``prompt``,
  ``print_info``, etc.) are lazy-imported so the plugin's module-load
  surface stays minimal.
* Remove ``"discord": _s._setup_discord`` from
  ``hermes_cli/gateway.py::_builtin_setup_fn``.
* Remove the entire 32-line ``_PLATFORMS["discord"]`` static dict entry —
  Discord's setup metadata is now discovered dynamically via
  ``_all_platforms()`` from the registry entry.

* Move the 59-line ``discord_cfg`` YAML→env bridge from
  ``gateway/config.py::load_gateway_config()`` into the plugin as
  ``_apply_yaml_config``.  Covers ``require_mention``,
  ``thread_require_mention``, ``free_response_channels``, ``auto_thread``,
  ``reactions``, ``ignored_channels``, ``allowed_channels``,
  ``no_thread_channels``, ``allow_mentions.{everyone,roles,users,
  replied_user}``, and ``reply_to_mode`` (including the YAML 1.1
  ``off``-as-False coercion and the ``extra.reply_to_mode`` fallback).
* Wire via ``apply_yaml_config_fn=_apply_yaml_config``.
* The hook runs BEFORE ``_apply_env_overrides`` and after the generic
  shared-key loop, exactly as documented in
  ``website/docs/developer-guide/adding-platform-adapters.md``.
* Behavior is preserved exactly — every assignment still uses
  ``not os.getenv(...)`` guards so env vars take precedence over YAML.

All 78 references to the old import path are rewritten — no back-compat
shim:

* 51 ``from gateway.platforms.discord import X`` →
  ``from plugins.platforms.discord.adapter import X``
* 5 ``import gateway.platforms.discord as discord_platform`` →
  ``import plugins.platforms.discord.adapter as discord_platform``
* 1 ``from gateway.platforms import discord as discord_mod`` →
  ``from plugins.platforms.discord import adapter as discord_mod``
* 21 ``mock.patch("gateway.platforms.discord.X")`` strings →
  ``mock.patch("plugins.platforms.discord.adapter.X")``
* 1 docstring reference in ``hermes_cli/commands.py``
* 1 import in ``tools/send_message_tool.py`` (now removed entirely)

The import-safety test in ``tests/gateway/test_discord_imports.py`` is
updated to purge the new canonical module name from ``sys.modules``.

**38 files changed, +621 / −473** — net positive due to the YAML hook
implementation (89 new LOC in the plugin trading for 59 deleted in core),
but every line moved has a clear plugin home now.  The git rename is
detected at R090 because the adapter gained ~340 LOC of moved-in hook
implementations (``_standalone_send`` + ``interactive_setup`` +
``_apply_yaml_config`` + helpers).

* All 568 Discord-specific tests pass across 25 ``test_discord_*.py``
  files plus voice/send/text-batching/reload-skills/stream-consumer/
  integration tests.
* All 147 tests in the YAML-touching subset
  (``test_discord_reply_mode``, ``test_discord_free_response``,
  ``test_discord_allowed_channels``, ``test_discord_allowed_mentions``,
  ``test_discord_channel_controls``, ``test_discord_reactions``,
  ``test_discord_thread_persistence``, ``test_runtime_footer``) pass —
  this is the strongest signal that the YAML→env hook behaves
  identically to the legacy block.
* Broader gateway/cron/integration sweep (1297 tests) introduces zero
  new failures vs ``main``.  Pre-existing failures in
  ``tests/gateway/test_tts_media_routing.py`` and
  ``tests/e2e/test_platform_commands.py`` reproduce identically on the
  unchanged ``main`` revision.
* Plugin discovery sanity check confirms Discord registers alongside the
  other four platform plugins:

    Registered platforms: ['discord', 'google_chat', 'irc', 'line', 'teams']

These Discord-shaped tendrils in core were **deliberately not moved** —
they are generic platform-registry concerns affecting every platform,
not Discord-specific:

* ``gateway/config.py:1205`` ``DISCORD_BOT_TOKEN → config.token`` env
  enablement — same shape Telegram has.  The existing
  ``env_enablement_fn`` registry hook only seeds ``extra``, not
  ``.token``, so it can't replace this without an adapter refactor to
  read from ``extra["bot_token"]``.
* ``gateway/run.py`` voice-mode hooks
  (``self.adapters.get(Platform.DISCORD)`` for
  ``start_voice_mode``/``stop_voice_mode``), role-based auth,
  ``DISCORD_ALLOW_BOTS`` branch in ``_is_user_authorized``,
  ``_UPDATE_ALLOWED_PLATFORMS`` frozenset, and the per-platform
  allowlist maps — generic platform-registry concerns.
* ``Platform.DISCORD`` enum literal — stable identifier used as dict
  keys throughout the codebase; removing it is a separate refactor with
  no real benefit.
* ``tools/discord_tool.py`` and ``tools/environments/local.py`` —
  first-class agent tools and env-passthrough config, neither is the
  gateway adapter.

Each of these is worth its own scoping issue when the time comes.
2026-05-22 14:21:41 -07:00

512 lines
19 KiB
Python

"""Tests for text message batching across all gateway adapters.
When a user sends a long message, the messaging client splits it at the
platform's character limit. Each adapter should buffer rapid successive
text messages from the same session and aggregate them before dispatching.
Covers: Discord, Matrix, WeCom, and the adaptive delay logic for
Telegram and Feishu.
"""
import asyncio
import os
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import MessageEvent, MessageType, SessionSource
# =====================================================================
# Helpers
# =====================================================================
def _make_event(
text: str,
platform: Platform,
chat_id: str = "12345",
msg_type: MessageType = MessageType.TEXT,
) -> MessageEvent:
return MessageEvent(
text=text,
message_type=msg_type,
source=SessionSource(platform=platform, chat_id=chat_id, chat_type="dm"),
)
# =====================================================================
# Discord text batching
# =====================================================================
def _make_discord_adapter():
"""Create a minimal DiscordAdapter for testing text batching."""
from plugins.platforms.discord.adapter import DiscordAdapter
config = PlatformConfig(enabled=True, token="test-token")
adapter = object.__new__(DiscordAdapter)
adapter._platform = Platform.DISCORD
adapter.config = config
adapter._pending_text_batches = {}
adapter._pending_text_batch_tasks = {}
adapter._text_batch_delay_seconds = 0.1 # fast for tests
adapter._text_batch_split_delay_seconds = 0.3 # fast for tests
adapter._active_sessions = {}
adapter._pending_messages = {}
adapter._message_handler = AsyncMock()
adapter.handle_message = AsyncMock()
return adapter
class TestDiscordTextBatching:
@pytest.mark.asyncio
async def test_single_message_dispatched_after_delay(self):
adapter = _make_discord_adapter()
event = _make_event("hello world", Platform.DISCORD)
adapter._enqueue_text_event(event)
# Not dispatched yet
adapter.handle_message.assert_not_called()
# Wait for flush
await asyncio.sleep(0.2)
adapter.handle_message.assert_called_once()
dispatched = adapter.handle_message.call_args[0][0]
assert dispatched.text == "hello world"
@pytest.mark.asyncio
async def test_split_messages_aggregated(self):
"""Two rapid messages from the same chat should be merged."""
adapter = _make_discord_adapter()
adapter._enqueue_text_event(_make_event("Part one of a long", Platform.DISCORD))
await asyncio.sleep(0.02)
adapter._enqueue_text_event(_make_event("message that was split.", Platform.DISCORD))
adapter.handle_message.assert_not_called()
await asyncio.sleep(0.2)
adapter.handle_message.assert_called_once()
text = adapter.handle_message.call_args[0][0].text
assert "Part one" in text
assert "split" in text
@pytest.mark.asyncio
async def test_three_way_split_aggregated(self):
adapter = _make_discord_adapter()
adapter._enqueue_text_event(_make_event("chunk 1", Platform.DISCORD))
await asyncio.sleep(0.02)
adapter._enqueue_text_event(_make_event("chunk 2", Platform.DISCORD))
await asyncio.sleep(0.02)
adapter._enqueue_text_event(_make_event("chunk 3", Platform.DISCORD))
await asyncio.sleep(0.2)
adapter.handle_message.assert_called_once()
text = adapter.handle_message.call_args[0][0].text
assert "chunk 1" in text
assert "chunk 2" in text
assert "chunk 3" in text
@pytest.mark.asyncio
async def test_different_chats_not_merged(self):
adapter = _make_discord_adapter()
adapter._enqueue_text_event(_make_event("from A", Platform.DISCORD, chat_id="111"))
adapter._enqueue_text_event(_make_event("from B", Platform.DISCORD, chat_id="222"))
await asyncio.sleep(0.2)
assert adapter.handle_message.call_count == 2
@pytest.mark.asyncio
async def test_batch_cleans_up_after_flush(self):
adapter = _make_discord_adapter()
adapter._enqueue_text_event(_make_event("test", Platform.DISCORD))
await asyncio.sleep(0.2)
assert len(adapter._pending_text_batches) == 0
@pytest.mark.asyncio
async def test_adaptive_delay_for_near_limit_chunk(self):
"""Chunks near the 2000-char limit should trigger longer delay."""
adapter = _make_discord_adapter()
# Simulate a chunk near Discord's 2000-char split point
long_text = "x" * 1950
adapter._enqueue_text_event(_make_event(long_text, Platform.DISCORD))
# After the short delay (0.1s), should NOT have flushed yet (split delay is 0.3s)
await asyncio.sleep(0.15)
adapter.handle_message.assert_not_called()
# After the split delay, should be flushed
await asyncio.sleep(0.25)
adapter.handle_message.assert_called_once()
@pytest.mark.asyncio
async def test_shield_protects_handle_message_from_cancel(self):
"""Regression guard: a follow-up chunk arriving while
handle_message is mid-flight must NOT cancel the running
dispatch. _enqueue_text_event fires prior_task.cancel() on
every new chunk; without asyncio.shield around handle_message
the cancel propagates into the agent's streaming request and
aborts the response.
"""
adapter = _make_discord_adapter()
handle_started = asyncio.Event()
release_handle = asyncio.Event()
first_handle_cancelled = asyncio.Event()
first_handle_completed = asyncio.Event()
call_count = [0]
async def slow_handle(event):
call_count[0] += 1
# Only the first call (batch 1) is the one we're protecting.
if call_count[0] == 1:
handle_started.set()
try:
await release_handle.wait()
first_handle_completed.set()
except asyncio.CancelledError:
first_handle_cancelled.set()
raise
# Second call (batch 2) returns immediately — not the subject
# of this test.
adapter.handle_message = slow_handle
# Prime batch 1 and wait for it to land inside handle_message.
adapter._enqueue_text_event(_make_event("batch 1", Platform.DISCORD))
await asyncio.wait_for(handle_started.wait(), timeout=1.0)
# A new chunk arrives — _enqueue_text_event fires
# prior_task.cancel() on batch 1's flush task, which is
# currently awaiting inside handle_message.
adapter._enqueue_text_event(_make_event("batch 2 follow-up", Platform.DISCORD))
# Let the cancel propagate.
await asyncio.sleep(0.05)
# CRITICAL ASSERTION: batch 1's handle_message must NOT have
# been cancelled. Without asyncio.shield this assertion fails
# because CancelledError propagates from the flush task's
# `await self.handle_message(event)` into slow_handle.
assert not first_handle_cancelled.is_set(), (
"handle_message for batch 1 was cancelled by a follow-up "
"chunk — asyncio.shield is missing or broken"
)
# Release batch 1's handle_message and let it complete.
release_handle.set()
await asyncio.wait_for(first_handle_completed.wait(), timeout=1.0)
assert first_handle_completed.is_set()
# Cleanup
for task in list(adapter._pending_text_batch_tasks.values()):
task.cancel()
await asyncio.sleep(0.01)
# =====================================================================
# Matrix text batching
# =====================================================================
def _make_matrix_adapter():
"""Create a minimal MatrixAdapter for testing text batching."""
from gateway.platforms.matrix import MatrixAdapter
config = PlatformConfig(enabled=True, token="test-token")
adapter = object.__new__(MatrixAdapter)
adapter._platform = Platform.MATRIX
adapter.config = config
adapter._pending_text_batches = {}
adapter._pending_text_batch_tasks = {}
adapter._text_batch_delay_seconds = 0.1
adapter._text_batch_split_delay_seconds = 0.3
adapter._active_sessions = {}
adapter._pending_messages = {}
adapter._message_handler = AsyncMock()
adapter.handle_message = AsyncMock()
return adapter
class TestMatrixTextBatching:
@pytest.mark.asyncio
async def test_single_message_dispatched_after_delay(self):
adapter = _make_matrix_adapter()
event = _make_event("hello world", Platform.MATRIX)
adapter._enqueue_text_event(event)
adapter.handle_message.assert_not_called()
await asyncio.sleep(0.2)
adapter.handle_message.assert_called_once()
assert adapter.handle_message.call_args[0][0].text == "hello world"
@pytest.mark.asyncio
async def test_split_messages_aggregated(self):
adapter = _make_matrix_adapter()
adapter._enqueue_text_event(_make_event("first part", Platform.MATRIX))
await asyncio.sleep(0.02)
adapter._enqueue_text_event(_make_event("second part", Platform.MATRIX))
adapter.handle_message.assert_not_called()
await asyncio.sleep(0.2)
adapter.handle_message.assert_called_once()
text = adapter.handle_message.call_args[0][0].text
assert "first part" in text
assert "second part" in text
@pytest.mark.asyncio
async def test_different_rooms_not_merged(self):
adapter = _make_matrix_adapter()
adapter._enqueue_text_event(_make_event("room A", Platform.MATRIX, chat_id="!aaa:matrix.org"))
adapter._enqueue_text_event(_make_event("room B", Platform.MATRIX, chat_id="!bbb:matrix.org"))
await asyncio.sleep(0.2)
assert adapter.handle_message.call_count == 2
@pytest.mark.asyncio
async def test_adaptive_delay_for_near_limit_chunk(self):
"""Chunks near the 4000-char limit should trigger longer delay."""
adapter = _make_matrix_adapter()
long_text = "x" * 3950
adapter._enqueue_text_event(_make_event(long_text, Platform.MATRIX))
await asyncio.sleep(0.15)
adapter.handle_message.assert_not_called()
await asyncio.sleep(0.25)
adapter.handle_message.assert_called_once()
@pytest.mark.asyncio
async def test_batch_cleans_up_after_flush(self):
adapter = _make_matrix_adapter()
adapter._enqueue_text_event(_make_event("test", Platform.MATRIX))
await asyncio.sleep(0.2)
assert len(adapter._pending_text_batches) == 0
# =====================================================================
# WeCom text batching
# =====================================================================
def _make_wecom_adapter():
"""Create a minimal WeComAdapter for testing text batching."""
from gateway.platforms.wecom import WeComAdapter
config = PlatformConfig(enabled=True, token="test-token")
adapter = object.__new__(WeComAdapter)
adapter._platform = Platform.WECOM
adapter.config = config
adapter._pending_text_batches = {}
adapter._pending_text_batch_tasks = {}
adapter._text_batch_delay_seconds = 0.1
adapter._text_batch_split_delay_seconds = 0.3
adapter._active_sessions = {}
adapter._pending_messages = {}
adapter._message_handler = AsyncMock()
adapter.handle_message = AsyncMock()
return adapter
class TestWeComTextBatching:
@pytest.mark.asyncio
async def test_single_message_dispatched_after_delay(self):
adapter = _make_wecom_adapter()
event = _make_event("hello world", Platform.WECOM)
adapter._enqueue_text_event(event)
adapter.handle_message.assert_not_called()
await asyncio.sleep(0.2)
adapter.handle_message.assert_called_once()
assert adapter.handle_message.call_args[0][0].text == "hello world"
@pytest.mark.asyncio
async def test_split_messages_aggregated(self):
adapter = _make_wecom_adapter()
adapter._enqueue_text_event(_make_event("first part", Platform.WECOM))
await asyncio.sleep(0.02)
adapter._enqueue_text_event(_make_event("second part", Platform.WECOM))
adapter.handle_message.assert_not_called()
await asyncio.sleep(0.2)
adapter.handle_message.assert_called_once()
text = adapter.handle_message.call_args[0][0].text
assert "first part" in text
assert "second part" in text
@pytest.mark.asyncio
async def test_different_chats_not_merged(self):
adapter = _make_wecom_adapter()
adapter._enqueue_text_event(_make_event("chat A", Platform.WECOM, chat_id="chat_a"))
adapter._enqueue_text_event(_make_event("chat B", Platform.WECOM, chat_id="chat_b"))
await asyncio.sleep(0.2)
assert adapter.handle_message.call_count == 2
@pytest.mark.asyncio
async def test_adaptive_delay_for_near_limit_chunk(self):
"""Chunks near the 4000-char limit should trigger longer delay."""
adapter = _make_wecom_adapter()
long_text = "x" * 3950
adapter._enqueue_text_event(_make_event(long_text, Platform.WECOM))
await asyncio.sleep(0.15)
adapter.handle_message.assert_not_called()
await asyncio.sleep(0.25)
adapter.handle_message.assert_called_once()
@pytest.mark.asyncio
async def test_batch_cleans_up_after_flush(self):
adapter = _make_wecom_adapter()
adapter._enqueue_text_event(_make_event("test", Platform.WECOM))
await asyncio.sleep(0.2)
assert len(adapter._pending_text_batches) == 0
# =====================================================================
# Telegram adaptive delay (PR #6891)
# =====================================================================
def _make_telegram_adapter():
"""Create a minimal TelegramAdapter for testing adaptive delay."""
from gateway.platforms.telegram import TelegramAdapter
config = PlatformConfig(enabled=True, token="test-token")
adapter = object.__new__(TelegramAdapter)
adapter._platform = Platform.TELEGRAM
adapter.config = config
adapter._pending_text_batches = {}
adapter._pending_text_batch_tasks = {}
adapter._text_batch_delay_seconds = 0.1
adapter._text_batch_split_delay_seconds = 0.3
adapter._active_sessions = {}
adapter._pending_messages = {}
adapter._message_handler = AsyncMock()
adapter.handle_message = AsyncMock()
return adapter
class TestTelegramAdaptiveDelay:
@pytest.mark.asyncio
async def test_short_chunk_uses_normal_delay(self):
adapter = _make_telegram_adapter()
adapter._enqueue_text_event(_make_event("short msg", Platform.TELEGRAM))
# Should flush after the normal 0.1s delay
await asyncio.sleep(0.15)
adapter.handle_message.assert_called_once()
@pytest.mark.asyncio
async def test_near_limit_chunk_uses_split_delay(self):
"""A chunk near the 4096-char limit should trigger longer delay."""
adapter = _make_telegram_adapter()
long_text = "x" * 4050 # near the 4096 limit
adapter._enqueue_text_event(_make_event(long_text, Platform.TELEGRAM))
# After the short delay, should NOT have flushed yet
await asyncio.sleep(0.15)
adapter.handle_message.assert_not_called()
# After the split delay, should be flushed
await asyncio.sleep(0.25)
adapter.handle_message.assert_called_once()
@pytest.mark.asyncio
async def test_split_continuation_merged(self):
"""Two near-limit chunks should both be merged."""
adapter = _make_telegram_adapter()
adapter._enqueue_text_event(_make_event("x" * 4050, Platform.TELEGRAM))
await asyncio.sleep(0.05)
adapter._enqueue_text_event(_make_event("continuation text", Platform.TELEGRAM))
# Short chunk arrived → should use normal delay now
await asyncio.sleep(0.15)
adapter.handle_message.assert_called_once()
text = adapter.handle_message.call_args[0][0].text
assert "continuation text" in text
# =====================================================================
# Feishu adaptive delay
# =====================================================================
def _make_feishu_adapter():
"""Create a minimal FeishuAdapter for testing adaptive delay."""
from gateway.platforms.feishu import FeishuAdapter, FeishuBatchState
config = PlatformConfig(enabled=True, token="test-token")
adapter = object.__new__(FeishuAdapter)
adapter._platform = Platform.FEISHU
adapter.config = config
batch_state = FeishuBatchState()
adapter._pending_text_batches = batch_state.events
adapter._pending_text_batch_tasks = batch_state.tasks
adapter._pending_text_batch_counts = batch_state.counts
adapter._text_batch_delay_seconds = 0.1
adapter._text_batch_split_delay_seconds = 0.3
adapter._text_batch_max_messages = 20
adapter._text_batch_max_chars = 50000
adapter._active_sessions = {}
adapter._pending_messages = {}
adapter._message_handler = AsyncMock()
adapter._handle_message_with_guards = AsyncMock()
return adapter
class TestFeishuAdaptiveDelay:
@pytest.mark.asyncio
async def test_short_chunk_uses_normal_delay(self):
adapter = _make_feishu_adapter()
event = _make_event("short msg", Platform.FEISHU)
await adapter._enqueue_text_event(event)
await asyncio.sleep(0.15)
adapter._handle_message_with_guards.assert_called_once()
@pytest.mark.asyncio
async def test_near_limit_chunk_uses_split_delay(self):
"""A chunk near the 4096-char limit should trigger longer delay."""
adapter = _make_feishu_adapter()
long_text = "x" * 4050
event = _make_event(long_text, Platform.FEISHU)
await adapter._enqueue_text_event(event)
await asyncio.sleep(0.15)
adapter._handle_message_with_guards.assert_not_called()
await asyncio.sleep(0.25)
adapter._handle_message_with_guards.assert_called_once()
@pytest.mark.asyncio
async def test_split_continuation_merged(self):
adapter = _make_feishu_adapter()
await adapter._enqueue_text_event(_make_event("x" * 4050, Platform.FEISHU))
await asyncio.sleep(0.05)
await adapter._enqueue_text_event(_make_event("continuation text", Platform.FEISHU))
await asyncio.sleep(0.15)
adapter._handle_message_with_guards.assert_called_once()
text = adapter._handle_message_with_guards.call_args[0][0].text
assert "continuation text" in text