diff --git a/gateway/platforms/feishu.py b/gateway/platforms/feishu.py index 01b1c3a14..6b62e46ad 100644 --- a/gateway/platforms/feishu.py +++ b/gateway/platforms/feishu.py @@ -1073,6 +1073,13 @@ class FeishuAdapter(BasePlatformAdapter): self._webhook_rate_counts: Dict[str, tuple[int, float]] = {} # rate_key → (count, window_start) self._webhook_anomaly_counts: Dict[str, tuple[int, str, float]] = {} # ip → (count, last_status, first_seen) self._card_action_tokens: Dict[str, float] = {} # token → first_seen_time + # Inbound events that arrived before the adapter loop was ready + # (e.g. during startup/restart or network-flap reconnect). A single + # drainer thread replays them as soon as the loop becomes available. + self._pending_inbound_events: List[Any] = [] + self._pending_inbound_lock = threading.Lock() + self._pending_drain_scheduled = False + self._pending_inbound_max_depth = 1000 # cap queue; drop oldest beyond self._chat_locks: Dict[str, asyncio.Lock] = {} # chat_id → lock (per-chat serial processing) self._sent_message_ids_to_chat: Dict[str, str] = {} # message_id → chat_id (for reaction routing) self._sent_message_id_order: List[str] = [] # LRU order for _sent_message_ids_to_chat @@ -1757,10 +1764,22 @@ class FeishuAdapter(BasePlatformAdapter): # ========================================================================= def _on_message_event(self, data: Any) -> None: - """Normalize Feishu inbound events into MessageEvent.""" + """Normalize Feishu inbound events into MessageEvent. + + Called by the lark_oapi SDK's event dispatcher on a background thread. + If the adapter loop is not currently accepting callbacks (brief window + during startup/restart or network-flap reconnect), the event is queued + for replay instead of dropped. + """ loop = self._loop - if loop is None or bool(getattr(loop, "is_closed", lambda: False)()): - logger.warning("[Feishu] Dropping inbound message before adapter loop is ready") + if not self._loop_accepts_callbacks(loop): + start_drainer = self._enqueue_pending_inbound_event(data) + if start_drainer: + threading.Thread( + target=self._drain_pending_inbound_events, + name="feishu-pending-inbound-drainer", + daemon=True, + ).start() return future = asyncio.run_coroutine_threadsafe( self._handle_message_event_data(data), @@ -1768,6 +1787,124 @@ class FeishuAdapter(BasePlatformAdapter): ) future.add_done_callback(self._log_background_failure) + def _enqueue_pending_inbound_event(self, data: Any) -> bool: + """Append an event to the pending-inbound queue. + + Returns True if the caller should spawn a drainer thread (no drainer + currently scheduled), False if a drainer is already running and will + pick up the new event on its next pass. + """ + with self._pending_inbound_lock: + if len(self._pending_inbound_events) >= self._pending_inbound_max_depth: + # Queue full — drop the oldest to make room. This happens only + # if the loop stays unavailable for an extended period AND the + # WS keeps firing callbacks. Still better than silent drops. + dropped = self._pending_inbound_events.pop(0) + try: + event = getattr(dropped, "event", None) + message = getattr(event, "message", None) + message_id = str(getattr(message, "message_id", "") or "unknown") + except Exception: + message_id = "unknown" + logger.error( + "[Feishu] Pending-inbound queue full (%d); dropped oldest event %s", + self._pending_inbound_max_depth, + message_id, + ) + self._pending_inbound_events.append(data) + depth = len(self._pending_inbound_events) + should_start = not self._pending_drain_scheduled + if should_start: + self._pending_drain_scheduled = True + logger.warning( + "[Feishu] Queued inbound event for replay (loop not ready, queue depth=%d)", + depth, + ) + return should_start + + def _drain_pending_inbound_events(self) -> None: + """Replay queued inbound events once the adapter loop is ready. + + Runs in a dedicated daemon thread. Polls ``_running`` and + ``_loop_accepts_callbacks`` until events can be dispatched or the + adapter shuts down. A single drainer handles the entire queue; + concurrent ``_on_message_event`` calls just append. + """ + poll_interval = 0.25 + max_wait_seconds = 120.0 # safety cap: drop queue after 2 minutes + waited = 0.0 + try: + while True: + if not getattr(self, "_running", True): + # Adapter shutting down — drop queued events rather than + # holding them against a closed loop. + with self._pending_inbound_lock: + dropped = len(self._pending_inbound_events) + self._pending_inbound_events.clear() + if dropped: + logger.warning( + "[Feishu] Dropped %d queued inbound event(s) during shutdown", + dropped, + ) + return + loop = self._loop + if self._loop_accepts_callbacks(loop): + with self._pending_inbound_lock: + batch = self._pending_inbound_events[:] + self._pending_inbound_events.clear() + if not batch: + # Queue emptied between check and grab; done. + with self._pending_inbound_lock: + if not self._pending_inbound_events: + return + continue + dispatched = 0 + requeue: List[Any] = [] + for event in batch: + try: + fut = asyncio.run_coroutine_threadsafe( + self._handle_message_event_data(event), + loop, + ) + fut.add_done_callback(self._log_background_failure) + dispatched += 1 + except RuntimeError: + # Loop closed between check and submit — requeue + # and poll again. + requeue.append(event) + if requeue: + with self._pending_inbound_lock: + self._pending_inbound_events[:0] = requeue + if dispatched: + logger.info( + "[Feishu] Replayed %d queued inbound event(s)", + dispatched, + ) + if not requeue: + # Successfully drained; check if more arrived while + # we were dispatching and exit if not. + with self._pending_inbound_lock: + if not self._pending_inbound_events: + return + # More events queued or requeue pending — loop again. + continue + if waited >= max_wait_seconds: + with self._pending_inbound_lock: + dropped = len(self._pending_inbound_events) + self._pending_inbound_events.clear() + logger.error( + "[Feishu] Adapter loop unavailable for %.0fs; " + "dropped %d queued inbound event(s)", + max_wait_seconds, + dropped, + ) + return + time.sleep(poll_interval) + waited += poll_interval + finally: + with self._pending_inbound_lock: + self._pending_drain_scheduled = False + async def _handle_message_event_data(self, data: Any) -> None: """Shared inbound message handling for websocket and webhook transports.""" event = getattr(data, "event", None) diff --git a/scripts/release.py b/scripts/release.py index a85e947ae..5af14ca53 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -53,6 +53,7 @@ AUTHOR_MAP = { "126368201+vilkasdev@users.noreply.github.com": "vilkasdev", "137614867+cutepawss@users.noreply.github.com": "cutepawss", "96793918+memosr@users.noreply.github.com": "memosr", + "milkoor@users.noreply.github.com": "milkoor", "131039422+SHL0MS@users.noreply.github.com": "SHL0MS", "77628552+raulvidis@users.noreply.github.com": "raulvidis", "145567217+Aum08Desai@users.noreply.github.com": "Aum08Desai", diff --git a/tests/gateway/test_feishu.py b/tests/gateway/test_feishu.py index 7b23a6985..4d9a7b843 100644 --- a/tests/gateway/test_feishu.py +++ b/tests/gateway/test_feishu.py @@ -2536,6 +2536,152 @@ class TestAdapterBehavior(unittest.TestCase): ) +@unittest.skipUnless(_HAS_LARK_OAPI, "lark-oapi not installed") +class TestPendingInboundQueue(unittest.TestCase): + """Tests for the loop-not-ready race (#5499): inbound events arriving + before or during adapter loop transitions must be queued for replay + rather than silently dropped.""" + + @patch.dict(os.environ, {}, clear=True) + def test_event_queued_when_loop_not_ready(self): + from gateway.config import PlatformConfig + from gateway.platforms.feishu import FeishuAdapter + + adapter = FeishuAdapter(PlatformConfig()) + adapter._loop = None # Simulate "before start()" or "during reconnect" + + with patch("gateway.platforms.feishu.threading.Thread") as thread_cls: + adapter._on_message_event(SimpleNamespace(tag="evt-1")) + adapter._on_message_event(SimpleNamespace(tag="evt-2")) + adapter._on_message_event(SimpleNamespace(tag="evt-3")) + + # All three queued, none dropped. + self.assertEqual(len(adapter._pending_inbound_events), 3) + # Only ONE drainer thread scheduled, not one per event. + self.assertEqual(thread_cls.call_count, 1) + # Drain scheduled flag set. + self.assertTrue(adapter._pending_drain_scheduled) + + @patch.dict(os.environ, {}, clear=True) + def test_drainer_replays_queued_events_when_loop_becomes_ready(self): + from gateway.config import PlatformConfig + from gateway.platforms.feishu import FeishuAdapter + + adapter = FeishuAdapter(PlatformConfig()) + adapter._loop = None + adapter._running = True + + class _ReadyLoop: + def is_closed(self): + return False + + # Queue three events while loop is None (simulate the race). + events = [SimpleNamespace(tag=f"evt-{i}") for i in range(3)] + with patch("gateway.platforms.feishu.threading.Thread"): + for ev in events: + adapter._on_message_event(ev) + + self.assertEqual(len(adapter._pending_inbound_events), 3) + + # Now the loop becomes ready; run the drainer inline (not as a thread) + # to verify it replays the queue. + adapter._loop = _ReadyLoop() + + future = SimpleNamespace(add_done_callback=lambda *_a, **_kw: None) + submitted: list = [] + + def _submit(coro, _loop): + submitted.append(coro) + coro.close() + return future + + with patch( + "gateway.platforms.feishu.asyncio.run_coroutine_threadsafe", + side_effect=_submit, + ) as submit: + adapter._drain_pending_inbound_events() + + # All three events dispatched to the loop. + self.assertEqual(submit.call_count, 3) + # Queue emptied. + self.assertEqual(len(adapter._pending_inbound_events), 0) + # Drain flag reset so a future race can schedule a new drainer. + self.assertFalse(adapter._pending_drain_scheduled) + + @patch.dict(os.environ, {}, clear=True) + def test_drainer_drops_queue_when_adapter_shuts_down(self): + from gateway.config import PlatformConfig + from gateway.platforms.feishu import FeishuAdapter + + adapter = FeishuAdapter(PlatformConfig()) + adapter._loop = None + adapter._running = False # Shutdown state + + with patch("gateway.platforms.feishu.threading.Thread"): + adapter._on_message_event(SimpleNamespace(tag="evt-lost")) + + self.assertEqual(len(adapter._pending_inbound_events), 1) + + # Drainer should drop the queue immediately since _running is False. + adapter._drain_pending_inbound_events() + + self.assertEqual(len(adapter._pending_inbound_events), 0) + self.assertFalse(adapter._pending_drain_scheduled) + + @patch.dict(os.environ, {}, clear=True) + def test_queue_cap_evicts_oldest_beyond_max_depth(self): + from gateway.config import PlatformConfig + from gateway.platforms.feishu import FeishuAdapter + + adapter = FeishuAdapter(PlatformConfig()) + adapter._loop = None + adapter._pending_inbound_max_depth = 3 # Shrink for test + + with patch("gateway.platforms.feishu.threading.Thread"): + for i in range(5): + adapter._on_message_event(SimpleNamespace(tag=f"evt-{i}")) + + # Only the last 3 should remain; evt-0 and evt-1 dropped. + self.assertEqual(len(adapter._pending_inbound_events), 3) + tags = [getattr(e, "tag", None) for e in adapter._pending_inbound_events] + self.assertEqual(tags, ["evt-2", "evt-3", "evt-4"]) + + @patch.dict(os.environ, {}, clear=True) + def test_normal_path_unchanged_when_loop_ready(self): + """When the loop is ready, events should dispatch directly without + ever touching the pending queue.""" + from gateway.config import PlatformConfig + from gateway.platforms.feishu import FeishuAdapter + + adapter = FeishuAdapter(PlatformConfig()) + + class _ReadyLoop: + def is_closed(self): + return False + + adapter._loop = _ReadyLoop() + + future = SimpleNamespace(add_done_callback=lambda *_a, **_kw: None) + + def _submit(coro, _loop): + coro.close() + return future + + with patch( + "gateway.platforms.feishu.asyncio.run_coroutine_threadsafe", + side_effect=_submit, + ) as submit, patch( + "gateway.platforms.feishu.threading.Thread" + ) as thread_cls: + adapter._on_message_event(SimpleNamespace(tag="evt")) + + self.assertEqual(submit.call_count, 1) + self.assertEqual(len(adapter._pending_inbound_events), 0) + self.assertFalse(adapter._pending_drain_scheduled) + # No drainer thread spawned when the happy path runs. + self.assertEqual(thread_cls.call_count, 0) + + @unittest.skipUnless(_HAS_LARK_OAPI, "lark-oapi not installed") class TestWebhookSecurity(unittest.TestCase): """Tests for webhook signature verification, rate limiting, and body size limits."""