diff --git a/gateway/platforms/feishu.py b/gateway/platforms/feishu.py index 4b4fa0da4e..85cebe5381 100644 --- a/gateway/platforms/feishu.py +++ b/gateway/platforms/feishu.py @@ -8,7 +8,8 @@ Supports: - Gateway allowlist integration via FEISHU_ALLOWED_USERS - Persistent dedup state across restarts - Per-chat serial message processing (matches openclaw createChatQueue) -- Persistent ACK emoji reaction on inbound messages +- Processing status reactions: Typing while working, removed on success, + swapped for CrossMark on failure - Reaction events routed as synthetic text events (matches openclaw) - Interactive card button-click events routed as synthetic COMMAND events - Webhook anomaly tracking (matches openclaw createWebhookAnomalyTracker) @@ -29,6 +30,7 @@ import re import threading import time import uuid +from collections import OrderedDict from dataclasses import dataclass, field from datetime import datetime from pathlib import Path @@ -98,6 +100,7 @@ from gateway.platforms.base import ( BasePlatformAdapter, MessageEvent, MessageType, + ProcessingOutcome, SendResult, SUPPORTED_DOCUMENT_TYPES, cache_document_from_bytes, @@ -190,7 +193,17 @@ _APPROVAL_LABEL_MAP: Dict[str, str] = { } _FEISHU_BOT_MSG_TRACK_SIZE = 512 # LRU size for tracking sent message IDs _FEISHU_REPLY_FALLBACK_CODES = frozenset({230011, 231003}) # reply target withdrawn/missing → create fallback -_FEISHU_ACK_EMOJI = "OK" + +# Feishu reactions render as prominent badges, unlike Discord/Telegram's +# small footer emoji — a success badge on every message would add noise, so +# we only mark start (Typing) and failure (CrossMark); the reply itself is +# the success signal. +_FEISHU_REACTION_IN_PROGRESS = "Typing" +_FEISHU_REACTION_FAILURE = "CrossMark" +# Bound on the (message_id → reaction_id) handle cache. Happy-path entries +# drain on completion; the cap is a safeguard against unbounded growth from +# delete-failures, not a capacity plan. +_FEISHU_PROCESSING_REACTION_CACHE_SIZE = 1024 # QR onboarding constants _ONBOARD_ACCOUNTS_URLS = { @@ -1141,6 +1154,9 @@ class FeishuAdapter(BasePlatformAdapter): # Exec approval button state (approval_id → {session_key, message_id, chat_id}) self._approval_state: Dict[int, Dict[str, str]] = {} self._approval_counter = itertools.count(1) + # Feishu reaction deletion requires the opaque reaction_id returned + # by create, so we cache it per message_id. + self._pending_processing_reactions: "OrderedDict[str, str]" = OrderedDict() self._load_seen_message_ids() @staticmethod @@ -2050,12 +2066,12 @@ class FeishuAdapter(BasePlatformAdapter): operator_type, emoji_type, ) - # Only process reactions from real users. Ignore app/bot-generated reactions - # and Hermes' own ACK emoji to avoid feedback loops. + # Drop bot/app-origin reactions to break the feedback loop from our + # own lifecycle reactions. A human reacting with the same emoji (e.g. + # clicking Typing on a bot message) is still routed through. loop = self._loop if ( operator_type in {"bot", "app"} - or emoji_type == _FEISHU_ACK_EMOJI or not message_id or loop is None or bool(getattr(loop, "is_closed", lambda: False)()) @@ -2279,33 +2295,35 @@ class FeishuAdapter(BasePlatformAdapter): async def _handle_message_with_guards(self, event: MessageEvent) -> None: """Dispatch a single event through the agent pipeline with per-chat serialization - and a persistent ACK emoji reaction before processing starts. + before handing the event off to the agent. - - Per-chat lock: ensures messages in the same chat are processed one at a time - (matches openclaw's createChatQueue serial queue behaviour). - - ACK indicator: adds a CHECK reaction to the triggering message before handing - off to the agent and leaves it in place as a receipt marker. + Per-chat lock ensures messages in the same chat are processed one at a + time (matches openclaw's createChatQueue serial queue behaviour). """ chat_id = getattr(event.source, "chat_id", "") or "" if event.source else "" chat_lock = self._get_chat_lock(chat_id) async with chat_lock: - message_id = event.message_id - if message_id: - await self._add_ack_reaction(message_id) await self.handle_message(event) - async def _add_ack_reaction(self, message_id: str) -> Optional[str]: - """Add a persistent ACK emoji reaction to signal the message was received.""" - if not self._client or not message_id: + # ========================================================================= + # Processing status reactions + # ========================================================================= + + def _reactions_enabled(self) -> bool: + return os.getenv("FEISHU_REACTIONS", "true").strip().lower() not in ("false", "0", "no") + + async def _add_reaction(self, message_id: str, emoji_type: str) -> Optional[str]: + """Return the reaction_id on success, else None. The id is needed later for deletion.""" + if not self._client or not message_id or not emoji_type: return None try: - from lark_oapi.api.im.v1 import ( # lazy import — keeps optional dep optional + from lark_oapi.api.im.v1 import ( CreateMessageReactionRequest, CreateMessageReactionRequestBody, ) body = ( CreateMessageReactionRequestBody.builder() - .reaction_type({"emoji_type": _FEISHU_ACK_EMOJI}) + .reaction_type({"emoji_type": emoji_type}) .build() ) request = ( @@ -2318,16 +2336,93 @@ class FeishuAdapter(BasePlatformAdapter): if response and getattr(response, "success", lambda: False)(): data = getattr(response, "data", None) return getattr(data, "reaction_id", None) - logger.warning( - "[Feishu] Failed to add ack reaction to %s: code=%s msg=%s", + logger.debug( + "[Feishu] Add reaction %s on %s rejected: code=%s msg=%s", + emoji_type, message_id, getattr(response, "code", None), getattr(response, "msg", None), ) except Exception: - logger.warning("[Feishu] Failed to add ack reaction to %s", message_id, exc_info=True) + logger.warning( + "[Feishu] Add reaction %s on %s raised", + emoji_type, + message_id, + exc_info=True, + ) return None + async def _remove_reaction(self, message_id: str, reaction_id: str) -> bool: + if not self._client or not message_id or not reaction_id: + return False + try: + from lark_oapi.api.im.v1 import DeleteMessageReactionRequest + request = ( + DeleteMessageReactionRequest.builder() + .message_id(message_id) + .reaction_id(reaction_id) + .build() + ) + response = await asyncio.to_thread(self._client.im.v1.message_reaction.delete, request) + if response and getattr(response, "success", lambda: False)(): + return True + logger.debug( + "[Feishu] Remove reaction %s on %s rejected: code=%s msg=%s", + reaction_id, + message_id, + getattr(response, "code", None), + getattr(response, "msg", None), + ) + except Exception: + logger.warning( + "[Feishu] Remove reaction %s on %s raised", + reaction_id, + message_id, + exc_info=True, + ) + return False + + def _remember_processing_reaction(self, message_id: str, reaction_id: str) -> None: + cache = self._pending_processing_reactions + cache[message_id] = reaction_id + cache.move_to_end(message_id) + while len(cache) > _FEISHU_PROCESSING_REACTION_CACHE_SIZE: + cache.popitem(last=False) + + def _pop_processing_reaction(self, message_id: str) -> Optional[str]: + return self._pending_processing_reactions.pop(message_id, None) + + async def on_processing_start(self, event: MessageEvent) -> None: + if not self._reactions_enabled(): + return + message_id = event.message_id + if not message_id or message_id in self._pending_processing_reactions: + return + reaction_id = await self._add_reaction(message_id, _FEISHU_REACTION_IN_PROGRESS) + if reaction_id: + self._remember_processing_reaction(message_id, reaction_id) + + async def on_processing_complete( + self, event: MessageEvent, outcome: ProcessingOutcome + ) -> None: + if not self._reactions_enabled(): + return + message_id = event.message_id + if not message_id: + return + + start_reaction_id = self._pending_processing_reactions.get(message_id) + if start_reaction_id: + if not await self._remove_reaction(message_id, start_reaction_id): + # Don't stack a second badge on top of a Typing we couldn't + # remove — UI would read as both "working" and "done/failed" + # simultaneously. Keep the handle so LRU eventually evicts it. + return + self._pop_processing_reaction(message_id) + + if outcome is ProcessingOutcome.FAILURE: + await self._add_reaction(message_id, _FEISHU_REACTION_FAILURE) + # ========================================================================= # Webhook server and security # ========================================================================= diff --git a/tests/gateway/test_feishu.py b/tests/gateway/test_feishu.py index 21ef6a4276..1813eb31f5 100644 --- a/tests/gateway/test_feishu.py +++ b/tests/gateway/test_feishu.py @@ -10,6 +10,8 @@ from pathlib import Path from types import SimpleNamespace from unittest.mock import AsyncMock, Mock, patch +from gateway.platforms.base import ProcessingOutcome + try: import lark_oapi _HAS_LARK_OAPI = True @@ -638,83 +640,54 @@ class TestAdapterBehavior(unittest.TestCase): ) @patch.dict(os.environ, {}, clear=True) - @unittest.skipUnless(_HAS_LARK_OAPI, "lark-oapi not installed") - def test_add_ack_reaction_uses_ok_emoji(self): - from gateway.config import PlatformConfig - from gateway.platforms.feishu import FeishuAdapter - - adapter = FeishuAdapter(PlatformConfig()) - captured = {} - - class _ReactionAPI: - def create(self, request): - captured["request"] = request - return SimpleNamespace( - success=lambda: True, - data=SimpleNamespace(reaction_id="r_typing"), - ) - - adapter._client = SimpleNamespace( - im=SimpleNamespace(v1=SimpleNamespace(message_reaction=_ReactionAPI())) - ) - - async def _direct(func, *args, **kwargs): - return func(*args, **kwargs) - - with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): - reaction_id = asyncio.run(adapter._add_ack_reaction("om_msg")) - - self.assertEqual(reaction_id, "r_typing") - self.assertEqual(captured["request"].request_body.reaction_type["emoji_type"], "OK") - - @patch.dict(os.environ, {}, clear=True) - def test_add_ack_reaction_logs_warning_on_failure(self): - from gateway.config import PlatformConfig - from gateway.platforms.feishu import FeishuAdapter - - adapter = FeishuAdapter(PlatformConfig()) - - class _ReactionAPI: - def create(self, request): - raise RuntimeError("boom") - - adapter._client = SimpleNamespace( - im=SimpleNamespace(v1=SimpleNamespace(message_reaction=_ReactionAPI())) - ) - - async def _direct(func, *args, **kwargs): - return func(*args, **kwargs) - - with ( - patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct), - self.assertLogs("gateway.platforms.feishu", level="WARNING") as logs, - ): - reaction_id = asyncio.run(adapter._add_ack_reaction("om_msg")) - - self.assertIsNone(reaction_id) - self.assertTrue( - any("Failed to add ack reaction to om_msg" in entry for entry in logs.output), - logs.output, - ) - - @patch.dict(os.environ, {}, clear=True) - def test_ack_reaction_events_are_ignored_to_avoid_feedback_loops(self): + def test_bot_origin_reactions_are_dropped_to_avoid_feedback_loops(self): from gateway.config import PlatformConfig from gateway.platforms.feishu import FeishuAdapter adapter = FeishuAdapter(PlatformConfig()) adapter._loop = object() + + for emoji in ("Typing", "CrossMark"): + event = SimpleNamespace( + message_id="om_msg", + operator_type="bot", + reaction_type=SimpleNamespace(emoji_type=emoji), + ) + data = SimpleNamespace(event=event) + with patch( + "gateway.platforms.feishu.asyncio.run_coroutine_threadsafe" + ) as run_threadsafe: + adapter._on_reaction_event("im.message.reaction.created_v1", data) + run_threadsafe.assert_not_called() + + @patch.dict(os.environ, {}, clear=True) + def test_user_reaction_with_managed_emoji_is_still_routed(self): + # Operator-origin filter is enough to prevent feedback loops; we must + # not additionally swallow user-origin reactions just because their + # emoji happens to collide with a lifecycle emoji. + from gateway.config import PlatformConfig + from gateway.platforms.feishu import FeishuAdapter + + adapter = FeishuAdapter(PlatformConfig()) + adapter._loop = SimpleNamespace(is_closed=lambda: False) + event = SimpleNamespace( message_id="om_msg", operator_type="user", - reaction_type=SimpleNamespace(emoji_type="OK"), + reaction_type=SimpleNamespace(emoji_type="Typing"), ) data = SimpleNamespace(event=event) - with patch("gateway.platforms.feishu.asyncio.run_coroutine_threadsafe") as run_threadsafe: - adapter._on_reaction_event("im.message.reaction.created_v1", data) + def _close_coro_and_return_future(coro, _loop): + coro.close() + return SimpleNamespace(add_done_callback=lambda _: None) - run_threadsafe.assert_not_called() + with patch( + "gateway.platforms.feishu.asyncio.run_coroutine_threadsafe", + side_effect=_close_coro_and_return_future, + ) as run_threadsafe: + adapter._on_reaction_event("im.message.reaction.created_v1", data) + run_threadsafe.assert_called_once() @patch.dict(os.environ, {"FEISHU_GROUP_POLICY": "open"}, clear=True) def test_group_message_requires_mentions_even_when_policy_open(self): @@ -3278,3 +3251,231 @@ class TestSenderNameResolution(unittest.TestCase): result = asyncio.run(adapter._resolve_sender_name_from_api("ou_broken")) self.assertIsNone(result) + + +@unittest.skipUnless(_HAS_LARK_OAPI, "lark-oapi not installed") +class TestProcessingReactions(unittest.TestCase): + """Typing on start → removed on SUCCESS, swapped for CrossMark on FAILURE, + removed (no replacement) on CANCELLED.""" + + @staticmethod + def _run(coro): + return asyncio.run(coro) + + def _build_adapter( + self, + create_success: bool = True, + delete_success: bool = True, + next_reaction_id: str = "r1", + ): + from gateway.config import PlatformConfig + from gateway.platforms.feishu import FeishuAdapter + + adapter = FeishuAdapter(PlatformConfig()) + tracker = SimpleNamespace( + create_calls=[], + delete_calls=[], + next_reaction_id=next_reaction_id, + create_success=create_success, + delete_success=delete_success, + ) + + def _create(request): + tracker.create_calls.append( + request.request_body.reaction_type["emoji_type"] + ) + if tracker.create_success: + return SimpleNamespace( + success=lambda: True, + data=SimpleNamespace(reaction_id=tracker.next_reaction_id), + ) + return SimpleNamespace( + success=lambda: False, code=99, msg="rejected", data=None, + ) + + def _delete(request): + tracker.delete_calls.append(request.reaction_id) + return SimpleNamespace( + success=lambda: tracker.delete_success, + code=0 if tracker.delete_success else 99, + msg="success" if tracker.delete_success else "rejected", + ) + + adapter._client = SimpleNamespace( + im=SimpleNamespace( + v1=SimpleNamespace( + message_reaction=SimpleNamespace(create=_create, delete=_delete), + ), + ), + ) + return adapter, tracker + + @staticmethod + def _event(message_id: str = "om_msg"): + return SimpleNamespace(message_id=message_id) + + def _patch_to_thread(self): + async def _direct(func, *args, **kwargs): + return func(*args, **kwargs) + + return patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct) + + # ------------------------------------------------------------------ start + @patch.dict(os.environ, {}, clear=True) + def test_start_adds_typing_and_caches_reaction_id(self): + adapter, tracker = self._build_adapter(next_reaction_id="r_typing") + with self._patch_to_thread(): + self._run(adapter.on_processing_start(self._event())) + self.assertEqual(tracker.create_calls, ["Typing"]) + self.assertEqual(adapter._pending_processing_reactions["om_msg"], "r_typing") + + @patch.dict(os.environ, {}, clear=True) + def test_start_is_idempotent_for_same_message_id(self): + adapter, tracker = self._build_adapter(next_reaction_id="r_typing") + with self._patch_to_thread(): + self._run(adapter.on_processing_start(self._event())) + self._run(adapter.on_processing_start(self._event())) + self.assertEqual(tracker.create_calls, ["Typing"]) + + @patch.dict(os.environ, {}, clear=True) + def test_start_does_not_cache_when_create_fails(self): + adapter, tracker = self._build_adapter(create_success=False) + with self._patch_to_thread(): + self._run(adapter.on_processing_start(self._event())) + self.assertEqual(tracker.create_calls, ["Typing"]) + self.assertNotIn("om_msg", adapter._pending_processing_reactions) + + # --------------------------------------------------------------- complete + @patch.dict(os.environ, {}, clear=True) + def test_success_removes_typing_and_adds_nothing(self): + adapter, tracker = self._build_adapter(next_reaction_id="r_typing") + with self._patch_to_thread(): + self._run(adapter.on_processing_start(self._event())) + self._run( + adapter.on_processing_complete(self._event(), ProcessingOutcome.SUCCESS) + ) + self.assertEqual(tracker.create_calls, ["Typing"]) + self.assertEqual(tracker.delete_calls, ["r_typing"]) + self.assertNotIn("om_msg", adapter._pending_processing_reactions) + + @patch.dict(os.environ, {}, clear=True) + def test_failure_removes_typing_then_adds_cross_mark(self): + adapter, tracker = self._build_adapter(next_reaction_id="r_typing") + with self._patch_to_thread(): + self._run(adapter.on_processing_start(self._event())) + self._run( + adapter.on_processing_complete(self._event(), ProcessingOutcome.FAILURE) + ) + self.assertEqual(tracker.create_calls, ["Typing", "CrossMark"]) + self.assertEqual(tracker.delete_calls, ["r_typing"]) + + @patch.dict(os.environ, {}, clear=True) + def test_cancelled_removes_typing_and_adds_nothing(self): + adapter, tracker = self._build_adapter(next_reaction_id="r_typing") + with self._patch_to_thread(): + self._run(adapter.on_processing_start(self._event())) + self._run( + adapter.on_processing_complete(self._event(), ProcessingOutcome.CANCELLED) + ) + self.assertEqual(tracker.create_calls, ["Typing"]) + self.assertEqual(tracker.delete_calls, ["r_typing"]) + self.assertNotIn("om_msg", adapter._pending_processing_reactions) + + @patch.dict(os.environ, {}, clear=True) + def test_failure_without_preceding_start_still_adds_cross_mark(self): + adapter, tracker = self._build_adapter() + with self._patch_to_thread(): + self._run( + adapter.on_processing_complete(self._event(), ProcessingOutcome.FAILURE) + ) + self.assertEqual(tracker.create_calls, ["CrossMark"]) + self.assertEqual(tracker.delete_calls, []) + + @patch.dict(os.environ, {}, clear=True) + def test_success_without_preceding_start_is_full_noop(self): + adapter, tracker = self._build_adapter() + with self._patch_to_thread(): + self._run( + adapter.on_processing_complete(self._event(), ProcessingOutcome.SUCCESS) + ) + self.assertEqual(tracker.create_calls, []) + self.assertEqual(tracker.delete_calls, []) + + # ------------------------- delete failure: don't stack badges ----------- + @patch.dict(os.environ, {}, clear=True) + def test_delete_failure_on_failure_outcome_skips_cross_mark(self): + # Removing Typing is best-effort — but if it fails, we must NOT + # additionally add CrossMark, or the UI would show two contradictory + # badges. The handle stays in the cache for LRU to clean up later. + adapter, tracker = self._build_adapter( + next_reaction_id="r_typing", delete_success=False, + ) + with self._patch_to_thread(): + self._run(adapter.on_processing_start(self._event())) + self._run( + adapter.on_processing_complete(self._event(), ProcessingOutcome.FAILURE) + ) + self.assertEqual(tracker.create_calls, ["Typing"]) # CrossMark NOT added + self.assertEqual(tracker.delete_calls, ["r_typing"]) # delete was attempted + self.assertEqual( + adapter._pending_processing_reactions["om_msg"], "r_typing", + ) # handle retained + + @patch.dict(os.environ, {}, clear=True) + def test_delete_failure_on_success_outcome_retains_handle(self): + adapter, tracker = self._build_adapter( + next_reaction_id="r_typing", delete_success=False, + ) + with self._patch_to_thread(): + self._run(adapter.on_processing_start(self._event())) + self._run( + adapter.on_processing_complete(self._event(), ProcessingOutcome.SUCCESS) + ) + self.assertEqual(tracker.create_calls, ["Typing"]) + self.assertEqual(tracker.delete_calls, ["r_typing"]) + self.assertEqual( + adapter._pending_processing_reactions["om_msg"], "r_typing", + ) + + # ------------------------------------------------------------- env toggle + @patch.dict(os.environ, {"FEISHU_REACTIONS": "false"}, clear=True) + def test_env_disable_short_circuits_both_hooks(self): + adapter, tracker = self._build_adapter() + with self._patch_to_thread(): + self._run(adapter.on_processing_start(self._event())) + self._run( + adapter.on_processing_complete(self._event(), ProcessingOutcome.FAILURE) + ) + self.assertEqual(tracker.create_calls, []) + self.assertEqual(tracker.delete_calls, []) + + # ------------------------------------------------------------- LRU bounds + @patch.dict(os.environ, {}, clear=True) + def test_cache_evicts_oldest_entry_beyond_size_limit(self): + from gateway.platforms.feishu import _FEISHU_PROCESSING_REACTION_CACHE_SIZE + + adapter, _ = self._build_adapter() + counter = {"n": 0} + + def _create(_request): + counter["n"] += 1 + return SimpleNamespace( + success=lambda: True, + data=SimpleNamespace(reaction_id=f"r{counter['n']}"), + ) + + adapter._client.im.v1.message_reaction.create = _create + + with self._patch_to_thread(): + for i in range(_FEISHU_PROCESSING_REACTION_CACHE_SIZE + 1): + self._run(adapter.on_processing_start(self._event(f"om_{i}"))) + + self.assertNotIn("om_0", adapter._pending_processing_reactions) + self.assertIn( + f"om_{_FEISHU_PROCESSING_REACTION_CACHE_SIZE}", + adapter._pending_processing_reactions, + ) + self.assertEqual( + len(adapter._pending_processing_reactions), + _FEISHU_PROCESSING_REACTION_CACHE_SIZE, + ) diff --git a/website/docs/user-guide/messaging/feishu.md b/website/docs/user-guide/messaging/feishu.md index 6e9f1d0e7f..5b753be49f 100644 --- a/website/docs/user-guide/messaging/feishu.md +++ b/website/docs/user-guide/messaging/feishu.md @@ -335,13 +335,22 @@ If the Feishu API rejects the post payload (e.g., due to unsupported markdown co Plain text messages (no markdown detected) are sent as the simple `text` message type. -## ACK Emoji Reactions +## Processing Status Reactions -When the adapter receives an inbound message, it immediately adds an ✅ (OK) emoji reaction to signal that the message was received and is being processed. This provides visual feedback before the agent completes its response. +The adapter cycles a reaction on the user's message to signal what the agent is doing: -The reaction is persistent — it remains on the message after the response is sent, serving as a receipt marker. +| Phase | Reaction | +|-------|----------| +| Agent begins processing | `Typing` added | +| Processing succeeds | `Typing` removed (the reply message itself is the success signal) | +| Processing fails | `Typing` removed, `CrossMark` added | +| Processing is cancelled or interrupted | `Typing` removed (task aborted; no replacement badge) | -User reactions on bot messages are also tracked. If a user adds or removes an emoji reaction on a message sent by the bot, it is routed as a synthetic text event (`reaction:added:EMOJI_TYPE` or `reaction:removed:EMOJI_TYPE`) so the agent can respond to feedback. +Unlike Discord/Matrix, no positive badge is added on success — Feishu reactions render as prominent timeline badges and a per-message success marker would create visual noise. The absence of a badge, together with the reply message, is the success signal. + +Set `FEISHU_REACTIONS=false` to disable this entirely (e.g., for tenants where the bot lacks reaction permission, or where the noise is unwanted). + +User reactions on bot messages are routed back to the agent as synthetic text events (`reaction:added:EMOJI_TYPE` or `reaction:removed:EMOJI_TYPE`). Only real user reactions are routed — bot/app-origin reactions (including the adapter's own `Typing`/`CrossMark`) are dropped to avoid feedback loops. ## Burst Protection and Batching