diff --git a/gateway/platforms/feishu.py b/gateway/platforms/feishu.py index fdfdd78b0..01b1c3a14 100644 --- a/gateway/platforms/feishu.py +++ b/gateway/platforms/feishu.py @@ -72,7 +72,10 @@ try: UpdateMessageRequestBody, ) from lark_oapi.core.const import FEISHU_DOMAIN, LARK_DOMAIN - from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse + from lark_oapi.event.callback.model.p2_card_action_trigger import ( + CallBackCard, + P2CardActionTriggerResponse, + ) from lark_oapi.event.dispatcher_handler import EventDispatcherHandler from lark_oapi.ws import Client as FeishuWSClient @@ -80,6 +83,7 @@ try: except ImportError: FEISHU_AVAILABLE = False lark = None # type: ignore[assignment] + CallBackCard = None # type: ignore[assignment] P2CardActionTriggerResponse = None # type: ignore[assignment] EventDispatcherHandler = None # type: ignore[assignment] FeishuWSClient = None # type: ignore[assignment] @@ -169,6 +173,19 @@ _FEISHU_WEBHOOK_BODY_TIMEOUT_SECONDS = 30 # max seconds to read request _FEISHU_WEBHOOK_ANOMALY_THRESHOLD = 25 # consecutive error responses before WARNING log _FEISHU_WEBHOOK_ANOMALY_TTL_SECONDS = 6 * 60 * 60 # anomaly tracker TTL (6 hours) — matches openclaw _FEISHU_CARD_ACTION_DEDUP_TTL_SECONDS = 15 * 60 # card action token dedup window (15 min) + +_APPROVAL_CHOICE_MAP: Dict[str, str] = { + "approve_once": "once", + "approve_session": "session", + "approve_always": "always", + "deny": "deny", +} +_APPROVAL_LABEL_MAP: Dict[str, str] = { + "once": "Approved once", + "session": "Approved for session", + "always": "Approved permanently", + "deny": "Denied", +} _FEISHU_BOT_MSG_TRACK_SIZE = 512 # LRU size for tracking sent message IDs _FEISHU_REPLY_FALLBACK_CODES = frozenset({230011, 231003}) # reply target withdrawn/missing → create fallback _FEISHU_ACK_EMOJI = "OK" @@ -1490,14 +1507,12 @@ class FeishuAdapter(BasePlatformAdapter): logger.warning("[Feishu] send_exec_approval failed: %s", exc) return SendResult(success=False, error=str(exc)) - async def _update_approval_card( - self, message_id: str, label: str, user_name: str, choice: str, - ) -> None: - """Replace the approval card with a resolved status card.""" - if not self._client or not message_id: - return + @staticmethod + def _build_resolved_approval_card(*, choice: str, user_name: str) -> Dict[str, Any]: + """Build raw card JSON for a resolved approval action.""" icon = "❌" if choice == "deny" else "✅" - card = { + label = _APPROVAL_LABEL_MAP.get(choice, "Resolved") + return { "config": {"wide_screen_mode": True}, "header": { "title": {"content": f"{icon} {label}", "tag": "plain_text"}, @@ -1510,13 +1525,6 @@ class FeishuAdapter(BasePlatformAdapter): }, ], } - try: - payload = json.dumps(card, ensure_ascii=False) - body = self._build_update_message_body(msg_type="interactive", content=payload) - request = self._build_update_message_request(message_id=message_id, request_body=body) - await asyncio.to_thread(self._client.im.v1.message.update, request) - except Exception as exc: - logger.warning("[Feishu] Failed to update approval card %s: %s", message_id, exc) async def send_voice( self, @@ -1845,20 +1853,82 @@ class FeishuAdapter(BasePlatformAdapter): future.add_done_callback(self._log_background_failure) def _on_card_action_trigger(self, data: Any) -> Any: - """Schedule Feishu card actions on the adapter loop and acknowledge immediately.""" + """Handle card-action callback from the Feishu SDK (synchronous). + + For approval actions: parses the event once, returns the resolved card + inline (the only reliable way to sync all clients), and schedules a + lightweight async method to actually unblock the agent. + + For other card actions: delegates to ``_handle_card_action_event``. + """ loop = self._loop - if loop is None or bool(getattr(loop, "is_closed", lambda: False)()): + if not self._loop_accepts_callbacks(loop): logger.warning("[Feishu] Dropping card action before adapter loop is ready") - else: - future = asyncio.run_coroutine_threadsafe( - self._handle_card_action_event(data), - loop, - ) - future.add_done_callback(self._log_background_failure) + return P2CardActionTriggerResponse() if P2CardActionTriggerResponse else None + + event = getattr(data, "event", None) + action = getattr(event, "action", None) + action_value = getattr(action, "value", {}) or {} + hermes_action = action_value.get("hermes_action") if isinstance(action_value, dict) else None + + if hermes_action: + return self._handle_approval_card_action(event=event, action_value=action_value, loop=loop) + + self._submit_on_loop(loop, self._handle_card_action_event(data)) if P2CardActionTriggerResponse is None: return None return P2CardActionTriggerResponse() + @staticmethod + def _loop_accepts_callbacks(loop: Any) -> bool: + """Return True when the adapter loop can accept thread-safe submissions.""" + return loop is not None and not bool(getattr(loop, "is_closed", lambda: False)()) + + def _submit_on_loop(self, loop: Any, coro: Any) -> None: + """Schedule background work on the adapter loop with shared failure logging.""" + future = asyncio.run_coroutine_threadsafe(coro, loop) + future.add_done_callback(self._log_background_failure) + + def _handle_approval_card_action(self, *, event: Any, action_value: Dict[str, Any], loop: Any) -> Any: + """Schedule approval resolution and build the synchronous callback response.""" + approval_id = action_value.get("approval_id") + if approval_id is None: + logger.debug("[Feishu] Card action missing approval_id, ignoring") + return P2CardActionTriggerResponse() if P2CardActionTriggerResponse else None + choice = _APPROVAL_CHOICE_MAP.get(action_value.get("hermes_action"), "deny") + + operator = getattr(event, "operator", None) + open_id = str(getattr(operator, "open_id", "") or "") + user_name = self._get_cached_sender_name(open_id) or open_id + + self._submit_on_loop(loop, self._resolve_approval(approval_id, choice, user_name)) + + if P2CardActionTriggerResponse is None: + return None + response = P2CardActionTriggerResponse() + if CallBackCard is not None: + card = CallBackCard() + card.type = "raw" + card.data = self._build_resolved_approval_card(choice=choice, user_name=user_name) + response.card = card + return response + + async def _resolve_approval(self, approval_id: Any, choice: str, user_name: str) -> None: + """Pop approval state and unblock the waiting agent thread.""" + state = self._approval_state.pop(approval_id, None) + if not state: + logger.debug("[Feishu] Approval %s already resolved or unknown", approval_id) + return + try: + from tools.approval import resolve_gateway_approval + count = resolve_gateway_approval(state["session_key"], choice) + logger.info( + "Feishu button resolved %d approval(s) for session %s (choice=%s, user=%s)", + count, state["session_key"], choice, user_name, + ) + except Exception as exc: + logger.error("Failed to resolve gateway approval from Feishu button: %s", exc) + async def _handle_reaction_event(self, event_type: str, data: Any) -> None: """Fetch the reacted-to message; if it was sent by this bot, emit a synthetic text event.""" if not self._client: @@ -1950,51 +2020,6 @@ class FeishuAdapter(BasePlatformAdapter): action_tag = str(getattr(action, "tag", "") or "button") action_value = getattr(action, "value", {}) or {} - # --- Exec approval button intercept --- - hermes_action = action_value.get("hermes_action") if isinstance(action_value, dict) else None - if hermes_action: - approval_id = action_value.get("approval_id") - state = self._approval_state.pop(approval_id, None) - if not state: - logger.debug("[Feishu] Approval %s already resolved or unknown", approval_id) - return - - choice_map = { - "approve_once": "once", - "approve_session": "session", - "approve_always": "always", - "deny": "deny", - } - choice = choice_map.get(hermes_action, "deny") - - label_map = { - "once": "Approved once", - "session": "Approved for session", - "always": "Approved permanently", - "deny": "Denied", - } - label = label_map.get(choice, "Resolved") - - # Resolve sender name for the status card - sender_id = SimpleNamespace(open_id=open_id, user_id=None, union_id=None) - sender_profile = await self._resolve_sender_profile(sender_id) - user_name = sender_profile.get("user_name") or open_id - - # Resolve the approval — unblocks the agent thread - try: - from tools.approval import resolve_gateway_approval - count = resolve_gateway_approval(state["session_key"], choice) - logger.info( - "Feishu button resolved %d approval(s) for session %s (choice=%s, user=%s)", - count, state["session_key"], choice, user_name, - ) - except Exception as exc: - logger.error("Failed to resolve gateway approval from Feishu button: %s", exc) - - # Update the card to show the decision - await self._update_approval_card(state.get("message_id", ""), label, user_name, choice) - return - synthetic_text = f"/card {action_tag}" if action_value: try: @@ -2897,6 +2922,19 @@ class FeishuAdapter(BasePlatformAdapter): "user_id_alt": union_id, } + def _get_cached_sender_name(self, sender_id: Optional[str]) -> Optional[str]: + """Return a cached sender name only while its TTL is still valid.""" + if not sender_id: + return None + cached = self._sender_name_cache.get(sender_id) + if cached is None: + return None + name, expire_at = cached + if time.time() < expire_at: + return name + self._sender_name_cache.pop(sender_id, None) + return None + async def _resolve_sender_name_from_api(self, sender_id: Optional[str]) -> Optional[str]: """Fetch the sender's display name from the Feishu contact API with a 10-minute cache. @@ -2909,11 +2947,9 @@ class FeishuAdapter(BasePlatformAdapter): if not trimmed: return None now = time.time() - cached = self._sender_name_cache.get(trimmed) - if cached is not None: - name, expire_at = cached - if now < expire_at: - return name + cached_name = self._get_cached_sender_name(trimmed) + if cached_name is not None: + return cached_name try: from lark_oapi.api.contact.v3 import GetUserRequest # lazy import if trimmed.startswith("ou_"): diff --git a/tests/gateway/test_feishu_approval_buttons.py b/tests/gateway/test_feishu_approval_buttons.py index 9c51d1ac4..954e9c061 100644 --- a/tests/gateway/test_feishu_approval_buttons.py +++ b/tests/gateway/test_feishu_approval_buttons.py @@ -1,12 +1,11 @@ """Tests for Feishu interactive card approval buttons.""" -import asyncio +import importlib.util import json -import os import sys from pathlib import Path from types import SimpleNamespace -from unittest.mock import AsyncMock, MagicMock, Mock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -23,14 +22,14 @@ if _repo not in sys.path: # --------------------------------------------------------------------------- def _ensure_feishu_mocks(): """Provide stubs for lark-oapi / aiohttp.web so the import succeeds.""" - if "lark_oapi" not in sys.modules: + if importlib.util.find_spec("lark_oapi") is None and "lark_oapi" not in sys.modules: mod = MagicMock() for name in ( "lark_oapi", "lark_oapi.api.im.v1", "lark_oapi.event", "lark_oapi.event.callback_type", ): sys.modules.setdefault(name, mod) - if "aiohttp" not in sys.modules: + if importlib.util.find_spec("aiohttp") is None and "aiohttp" not in sys.modules: aio = MagicMock() sys.modules.setdefault("aiohttp", aio) sys.modules.setdefault("aiohttp.web", aio.web) @@ -39,6 +38,7 @@ def _ensure_feishu_mocks(): _ensure_feishu_mocks() from gateway.config import PlatformConfig +import gateway.platforms.feishu as feishu_module from gateway.platforms.feishu import FeishuAdapter @@ -74,6 +74,12 @@ def _make_card_action_data( ) +def _close_submitted_coro(coro, _loop): + """Close scheduled coroutines in sync-handler tests to avoid unawaited warnings.""" + coro.close() + return SimpleNamespace(add_done_callback=lambda *_args, **_kwargs: None) + + # =========================================================================== # send_exec_approval — interactive card with buttons # =========================================================================== @@ -203,14 +209,14 @@ class TestFeishuExecApproval: # =========================================================================== -# _handle_card_action_event — approval button clicks +# _resolve_approval — approval state pop + gateway resolution # =========================================================================== -class TestFeishuApprovalCallback: - """Test the approval intercept in _handle_card_action_event.""" +class TestResolveApproval: + """Test _resolve_approval pops state and calls resolve_gateway_approval.""" @pytest.mark.asyncio - async def test_resolves_approval_on_click(self): + async def test_resolves_once(self): adapter = _make_adapter() adapter._approval_state[1] = { "session_key": "agent:main:feishu:group:oc_12345", @@ -218,28 +224,14 @@ class TestFeishuApprovalCallback: "chat_id": "oc_12345", } - data = _make_card_action_data( - action_value={"hermes_action": "approve_once", "approval_id": 1}, - ) - - with ( - patch.object( - adapter, "_resolve_sender_profile", new_callable=AsyncMock, - return_value={"user_id": "ou_user1", "user_name": "Norbert", "user_id_alt": None}, - ), - patch.object(adapter, "_update_approval_card", new_callable=AsyncMock) as mock_update, - patch("tools.approval.resolve_gateway_approval", return_value=1) as mock_resolve, - ): - await adapter._handle_card_action_event(data) + with patch("tools.approval.resolve_gateway_approval", return_value=1) as mock_resolve: + await adapter._resolve_approval(1, "once", "Norbert") mock_resolve.assert_called_once_with("agent:main:feishu:group:oc_12345", "once") - mock_update.assert_called_once_with("msg_001", "Approved once", "Norbert", "once") - - # State should be cleaned up assert 1 not in adapter._approval_state @pytest.mark.asyncio - async def test_deny_button(self): + async def test_resolves_deny(self): adapter = _make_adapter() adapter._approval_state[2] = { "session_key": "some-session", @@ -247,26 +239,13 @@ class TestFeishuApprovalCallback: "chat_id": "oc_12345", } - data = _make_card_action_data( - action_value={"hermes_action": "deny", "approval_id": 2}, - token="tok_deny", - ) - - with ( - patch.object( - adapter, "_resolve_sender_profile", new_callable=AsyncMock, - return_value={"user_id": "ou_alice", "user_name": "Alice", "user_id_alt": None}, - ), - patch.object(adapter, "_update_approval_card", new_callable=AsyncMock) as mock_update, - patch("tools.approval.resolve_gateway_approval", return_value=1) as mock_resolve, - ): - await adapter._handle_card_action_event(data) + with patch("tools.approval.resolve_gateway_approval", return_value=1) as mock_resolve: + await adapter._resolve_approval(2, "deny", "Alice") mock_resolve.assert_called_once_with("some-session", "deny") - mock_update.assert_called_once_with("msg_002", "Denied", "Alice", "deny") @pytest.mark.asyncio - async def test_session_approval(self): + async def test_resolves_session(self): adapter = _make_adapter() adapter._approval_state[3] = { "session_key": "sess-3", @@ -274,26 +253,13 @@ class TestFeishuApprovalCallback: "chat_id": "oc_99", } - data = _make_card_action_data( - action_value={"hermes_action": "approve_session", "approval_id": 3}, - token="tok_ses", - ) - - with ( - patch.object( - adapter, "_resolve_sender_profile", new_callable=AsyncMock, - return_value={"user_id": "ou_u", "user_name": "Bob", "user_id_alt": None}, - ), - patch.object(adapter, "_update_approval_card", new_callable=AsyncMock) as mock_update, - patch("tools.approval.resolve_gateway_approval", return_value=1) as mock_resolve, - ): - await adapter._handle_card_action_event(data) + with patch("tools.approval.resolve_gateway_approval", return_value=1) as mock_resolve: + await adapter._resolve_approval(3, "session", "Bob") mock_resolve.assert_called_once_with("sess-3", "session") - mock_update.assert_called_once_with("msg_003", "Approved for session", "Bob", "session") @pytest.mark.asyncio - async def test_always_approval(self): + async def test_resolves_always(self): adapter = _make_adapter() adapter._approval_state[4] = { "session_key": "sess-4", @@ -301,42 +267,29 @@ class TestFeishuApprovalCallback: "chat_id": "oc_55", } - data = _make_card_action_data( - action_value={"hermes_action": "approve_always", "approval_id": 4}, - token="tok_alw", - ) - - with ( - patch.object( - adapter, "_resolve_sender_profile", new_callable=AsyncMock, - return_value={"user_id": "ou_u", "user_name": "Carol", "user_id_alt": None}, - ), - patch.object(adapter, "_update_approval_card", new_callable=AsyncMock), - patch("tools.approval.resolve_gateway_approval", return_value=1) as mock_resolve, - ): - await adapter._handle_card_action_event(data) + with patch("tools.approval.resolve_gateway_approval", return_value=1) as mock_resolve: + await adapter._resolve_approval(4, "always", "Carol") mock_resolve.assert_called_once_with("sess-4", "always") @pytest.mark.asyncio async def test_already_resolved_drops_silently(self): adapter = _make_adapter() - # No state for approval_id 99 — already resolved - - data = _make_card_action_data( - action_value={"hermes_action": "approve_once", "approval_id": 99}, - token="tok_gone", - ) with patch("tools.approval.resolve_gateway_approval") as mock_resolve: - await adapter._handle_card_action_event(data) + await adapter._resolve_approval(99, "once", "Nobody") - # Should NOT resolve — already handled mock_resolve.assert_not_called() +# =========================================================================== +# _handle_card_action_event — non-approval card actions +# =========================================================================== + +class TestNonApprovalCardAction: + """Non-approval card actions should still route as synthetic commands.""" + @pytest.mark.asyncio - async def test_non_approval_actions_route_normally(self): - """Non-approval card actions should still become synthetic commands.""" + async def test_routes_as_synthetic_command(self): adapter = _make_adapter() data = _make_card_action_data( @@ -351,82 +304,141 @@ class TestFeishuApprovalCallback: ), patch.object(adapter, "get_chat_info", new_callable=AsyncMock, return_value={"name": "Test Chat"}), patch.object(adapter, "_handle_message_with_guards", new_callable=AsyncMock) as mock_handle, - patch("tools.approval.resolve_gateway_approval") as mock_resolve, ): await adapter._handle_card_action_event(data) - # Should NOT resolve any approval - mock_resolve.assert_not_called() - # Should have routed as synthetic command mock_handle.assert_called_once() event = mock_handle.call_args[0][0] assert "/card button" in event.text # =========================================================================== -# _update_approval_card — card replacement after resolution +# _on_card_action_trigger — inline card response for approval actions # =========================================================================== -class TestFeishuUpdateApprovalCard: - """Test the card update after approval resolution.""" +class _FakeCallBackCard: + def __init__(self): + self.type = None + self.data = None - @pytest.mark.asyncio - async def test_updates_card_on_approve(self): + +class _FakeP2Response: + def __init__(self): + self.card = None + + +@pytest.fixture(autouse=False) +def _patch_callback_card_types(monkeypatch): + """Provide real-ish P2CardActionTriggerResponse / CallBackCard for tests.""" + monkeypatch.setattr(feishu_module, "P2CardActionTriggerResponse", _FakeP2Response) + monkeypatch.setattr(feishu_module, "CallBackCard", _FakeCallBackCard) + + +class TestCardActionCallbackResponse: + """Test that _on_card_action_trigger returns updated card inline.""" + + def test_drops_action_when_loop_not_ready(self, _patch_callback_card_types): adapter = _make_adapter() + adapter._loop = None + data = _make_card_action_data({"hermes_action": "approve_once", "approval_id": 1}) - mock_update = AsyncMock() - adapter._client.im.v1.message.update = MagicMock() + with patch("asyncio.run_coroutine_threadsafe") as mock_submit: + response = adapter._on_card_action_trigger(data) - with patch("asyncio.to_thread", new_callable=AsyncMock) as mock_thread: - await adapter._update_approval_card( - "msg_001", "Approved once", "Norbert", "once" - ) + assert response is not None + assert response.card is None + mock_submit.assert_not_called() - mock_thread.assert_called_once() - # Verify the update request was built - call_args = mock_thread.call_args - assert call_args[0][0] == adapter._client.im.v1.message.update - - @pytest.mark.asyncio - async def test_updates_card_on_deny(self): + def test_returns_card_for_approve_action(self, _patch_callback_card_types): adapter = _make_adapter() + adapter._loop = MagicMock() + adapter._loop.is_closed = MagicMock(return_value=False) + data = _make_card_action_data( + {"hermes_action": "approve_once", "approval_id": 1}, + open_id="ou_bob", + ) + adapter._sender_name_cache["ou_bob"] = ("Bob", 9999999999) - with patch("asyncio.to_thread", new_callable=AsyncMock) as mock_thread: - await adapter._update_approval_card( - "msg_002", "Denied", "Alice", "deny" - ) + with patch("asyncio.run_coroutine_threadsafe", side_effect=_close_submitted_coro): + response = adapter._on_card_action_trigger(data) - mock_thread.assert_called_once() + assert response is not None + assert response.card is not None + assert response.card.type == "raw" + card = response.card.data + assert card["header"]["template"] == "green" + assert "Approved once" in card["header"]["title"]["content"] + assert "Bob" in card["elements"][0]["content"] - @pytest.mark.asyncio - async def test_skips_update_when_not_connected(self): + def test_returns_card_for_deny_action(self, _patch_callback_card_types): adapter = _make_adapter() - adapter._client = None + adapter._loop = MagicMock() + adapter._loop.is_closed = MagicMock(return_value=False) + data = _make_card_action_data( + {"hermes_action": "deny", "approval_id": 2}, + ) - with patch("asyncio.to_thread", new_callable=AsyncMock) as mock_thread: - await adapter._update_approval_card( - "msg_001", "Approved", "Bob", "once" - ) + with patch("asyncio.run_coroutine_threadsafe", side_effect=_close_submitted_coro): + response = adapter._on_card_action_trigger(data) - mock_thread.assert_not_called() + assert response.card is not None + card = response.card.data + assert card["header"]["template"] == "red" + assert "Denied" in card["header"]["title"]["content"] - @pytest.mark.asyncio - async def test_skips_update_when_no_message_id(self): + def test_ignores_missing_approval_id(self, _patch_callback_card_types): adapter = _make_adapter() + adapter._loop = MagicMock() + adapter._loop.is_closed = MagicMock(return_value=False) + data = _make_card_action_data({"hermes_action": "approve_once"}) - with patch("asyncio.to_thread", new_callable=AsyncMock) as mock_thread: - await adapter._update_approval_card( - "", "Approved", "Bob", "once" - ) + with patch("asyncio.run_coroutine_threadsafe") as mock_submit: + response = adapter._on_card_action_trigger(data) - mock_thread.assert_not_called() + assert response is not None + assert response.card is None + mock_submit.assert_not_called() - @pytest.mark.asyncio - async def test_swallows_update_errors(self): + def test_no_card_for_non_approval_action(self, _patch_callback_card_types): adapter = _make_adapter() + adapter._loop = MagicMock() + adapter._loop.is_closed = MagicMock(return_value=False) + data = _make_card_action_data({"some_other": "value"}) - with patch("asyncio.to_thread", new_callable=AsyncMock, side_effect=Exception("API error")): - # Should not raise - await adapter._update_approval_card( - "msg_001", "Approved", "Bob", "once" - ) + with patch("asyncio.run_coroutine_threadsafe", side_effect=_close_submitted_coro): + response = adapter._on_card_action_trigger(data) + + assert response is not None + assert response.card is None + + def test_falls_back_to_open_id_when_name_not_cached(self, _patch_callback_card_types): + adapter = _make_adapter() + adapter._loop = MagicMock() + adapter._loop.is_closed = MagicMock(return_value=False) + data = _make_card_action_data( + {"hermes_action": "approve_session", "approval_id": 3}, + open_id="ou_unknown", + ) + + with patch("asyncio.run_coroutine_threadsafe", side_effect=_close_submitted_coro): + response = adapter._on_card_action_trigger(data) + + card = response.card.data + assert "ou_unknown" in card["elements"][0]["content"] + + def test_ignores_expired_cached_name(self, _patch_callback_card_types): + adapter = _make_adapter() + adapter._loop = MagicMock() + adapter._loop.is_closed = MagicMock(return_value=False) + data = _make_card_action_data( + {"hermes_action": "approve_once", "approval_id": 4}, + open_id="ou_expired", + ) + adapter._sender_name_cache["ou_expired"] = ("Old Name", 1) + + with patch("asyncio.run_coroutine_threadsafe", side_effect=_close_submitted_coro): + response = adapter._on_card_action_trigger(data) + + card = response.card.data + assert "Old Name" not in card["elements"][0]["content"] + assert "ou_expired" in card["elements"][0]["content"]