fix(feishu): queue inbound events when adapter loop not ready (#5499) (#11372)

Inbound Feishu messages arriving during brief windows when the adapter
loop is unavailable (startup/restart transitions, network-flap reconnect)
were silently dropped with a WARNING log. This matches the symptom in
issue #5499 — and users have reported seeing only a subset of their
messages reach the agent.

Fix: queue pending events in a thread-safe list and spawn a single
drainer thread that replays them once the loop becomes ready. Covers
these scenarios:

  * Queue events instead of dropping when loop is None/closed
  * Single drainer handles the full queue (not thread-per-event)
  * Thread-safe with threading.Lock on the queue and schedule flag
  * Handles mid-drain bursts (new events arrive while drainer is working)
  * Handles RuntimeError if loop closes between check and submit
  * Depth cap (1000) prevents unbounded growth during extended outages
  * Drops queue cleanly on disconnect rather than holding forever
  * Safety timeout (120s) prevents infinite retention on broken adapters

Based on the approach proposed in #4789 by milkoor, rewritten for
thread-safety and correctness.

Test plan:
  * 5 new unit tests (TestPendingInboundQueue) — all passing
  * E2E test with real asyncio loop + fake WS thread: 10-event burst
    before loop ready → all 10 delivered in order
  * E2E concurrent burst test: 20 events queued, 20 more arrive during
    drainer dispatch → all 40 delivered, no loss, no duplicates
  * All 111 existing feishu tests pass

Related: #5499, #4789

Co-authored-by: milkoor <milkoor@users.noreply.github.com>
This commit is contained in:
Teknium 2026-04-16 20:36:59 -07:00 committed by GitHub
parent 01906e99dd
commit 7af9bf3a54
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 287 additions and 3 deletions

View file

@ -1073,6 +1073,13 @@ class FeishuAdapter(BasePlatformAdapter):
self._webhook_rate_counts: Dict[str, tuple[int, float]] = {} # rate_key → (count, window_start)
self._webhook_anomaly_counts: Dict[str, tuple[int, str, float]] = {} # ip → (count, last_status, first_seen)
self._card_action_tokens: Dict[str, float] = {} # token → first_seen_time
# Inbound events that arrived before the adapter loop was ready
# (e.g. during startup/restart or network-flap reconnect). A single
# drainer thread replays them as soon as the loop becomes available.
self._pending_inbound_events: List[Any] = []
self._pending_inbound_lock = threading.Lock()
self._pending_drain_scheduled = False
self._pending_inbound_max_depth = 1000 # cap queue; drop oldest beyond
self._chat_locks: Dict[str, asyncio.Lock] = {} # chat_id → lock (per-chat serial processing)
self._sent_message_ids_to_chat: Dict[str, str] = {} # message_id → chat_id (for reaction routing)
self._sent_message_id_order: List[str] = [] # LRU order for _sent_message_ids_to_chat
@ -1757,10 +1764,22 @@ class FeishuAdapter(BasePlatformAdapter):
# =========================================================================
def _on_message_event(self, data: Any) -> None:
"""Normalize Feishu inbound events into MessageEvent."""
"""Normalize Feishu inbound events into MessageEvent.
Called by the lark_oapi SDK's event dispatcher on a background thread.
If the adapter loop is not currently accepting callbacks (brief window
during startup/restart or network-flap reconnect), the event is queued
for replay instead of dropped.
"""
loop = self._loop
if loop is None or bool(getattr(loop, "is_closed", lambda: False)()):
logger.warning("[Feishu] Dropping inbound message before adapter loop is ready")
if not self._loop_accepts_callbacks(loop):
start_drainer = self._enqueue_pending_inbound_event(data)
if start_drainer:
threading.Thread(
target=self._drain_pending_inbound_events,
name="feishu-pending-inbound-drainer",
daemon=True,
).start()
return
future = asyncio.run_coroutine_threadsafe(
self._handle_message_event_data(data),
@ -1768,6 +1787,124 @@ class FeishuAdapter(BasePlatformAdapter):
)
future.add_done_callback(self._log_background_failure)
def _enqueue_pending_inbound_event(self, data: Any) -> bool:
"""Append an event to the pending-inbound queue.
Returns True if the caller should spawn a drainer thread (no drainer
currently scheduled), False if a drainer is already running and will
pick up the new event on its next pass.
"""
with self._pending_inbound_lock:
if len(self._pending_inbound_events) >= self._pending_inbound_max_depth:
# Queue full — drop the oldest to make room. This happens only
# if the loop stays unavailable for an extended period AND the
# WS keeps firing callbacks. Still better than silent drops.
dropped = self._pending_inbound_events.pop(0)
try:
event = getattr(dropped, "event", None)
message = getattr(event, "message", None)
message_id = str(getattr(message, "message_id", "") or "unknown")
except Exception:
message_id = "unknown"
logger.error(
"[Feishu] Pending-inbound queue full (%d); dropped oldest event %s",
self._pending_inbound_max_depth,
message_id,
)
self._pending_inbound_events.append(data)
depth = len(self._pending_inbound_events)
should_start = not self._pending_drain_scheduled
if should_start:
self._pending_drain_scheduled = True
logger.warning(
"[Feishu] Queued inbound event for replay (loop not ready, queue depth=%d)",
depth,
)
return should_start
def _drain_pending_inbound_events(self) -> None:
"""Replay queued inbound events once the adapter loop is ready.
Runs in a dedicated daemon thread. Polls ``_running`` and
``_loop_accepts_callbacks`` until events can be dispatched or the
adapter shuts down. A single drainer handles the entire queue;
concurrent ``_on_message_event`` calls just append.
"""
poll_interval = 0.25
max_wait_seconds = 120.0 # safety cap: drop queue after 2 minutes
waited = 0.0
try:
while True:
if not getattr(self, "_running", True):
# Adapter shutting down — drop queued events rather than
# holding them against a closed loop.
with self._pending_inbound_lock:
dropped = len(self._pending_inbound_events)
self._pending_inbound_events.clear()
if dropped:
logger.warning(
"[Feishu] Dropped %d queued inbound event(s) during shutdown",
dropped,
)
return
loop = self._loop
if self._loop_accepts_callbacks(loop):
with self._pending_inbound_lock:
batch = self._pending_inbound_events[:]
self._pending_inbound_events.clear()
if not batch:
# Queue emptied between check and grab; done.
with self._pending_inbound_lock:
if not self._pending_inbound_events:
return
continue
dispatched = 0
requeue: List[Any] = []
for event in batch:
try:
fut = asyncio.run_coroutine_threadsafe(
self._handle_message_event_data(event),
loop,
)
fut.add_done_callback(self._log_background_failure)
dispatched += 1
except RuntimeError:
# Loop closed between check and submit — requeue
# and poll again.
requeue.append(event)
if requeue:
with self._pending_inbound_lock:
self._pending_inbound_events[:0] = requeue
if dispatched:
logger.info(
"[Feishu] Replayed %d queued inbound event(s)",
dispatched,
)
if not requeue:
# Successfully drained; check if more arrived while
# we were dispatching and exit if not.
with self._pending_inbound_lock:
if not self._pending_inbound_events:
return
# More events queued or requeue pending — loop again.
continue
if waited >= max_wait_seconds:
with self._pending_inbound_lock:
dropped = len(self._pending_inbound_events)
self._pending_inbound_events.clear()
logger.error(
"[Feishu] Adapter loop unavailable for %.0fs; "
"dropped %d queued inbound event(s)",
max_wait_seconds,
dropped,
)
return
time.sleep(poll_interval)
waited += poll_interval
finally:
with self._pending_inbound_lock:
self._pending_drain_scheduled = False
async def _handle_message_event_data(self, data: Any) -> None:
"""Shared inbound message handling for websocket and webhook transports."""
event = getattr(data, "event", None)

View file

@ -53,6 +53,7 @@ AUTHOR_MAP = {
"126368201+vilkasdev@users.noreply.github.com": "vilkasdev",
"137614867+cutepawss@users.noreply.github.com": "cutepawss",
"96793918+memosr@users.noreply.github.com": "memosr",
"milkoor@users.noreply.github.com": "milkoor",
"131039422+SHL0MS@users.noreply.github.com": "SHL0MS",
"77628552+raulvidis@users.noreply.github.com": "raulvidis",
"145567217+Aum08Desai@users.noreply.github.com": "Aum08Desai",

View file

@ -2536,6 +2536,152 @@ class TestAdapterBehavior(unittest.TestCase):
)
@unittest.skipUnless(_HAS_LARK_OAPI, "lark-oapi not installed")
class TestPendingInboundQueue(unittest.TestCase):
"""Tests for the loop-not-ready race (#5499): inbound events arriving
before or during adapter loop transitions must be queued for replay
rather than silently dropped."""
@patch.dict(os.environ, {}, clear=True)
def test_event_queued_when_loop_not_ready(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter._loop = None # Simulate "before start()" or "during reconnect"
with patch("gateway.platforms.feishu.threading.Thread") as thread_cls:
adapter._on_message_event(SimpleNamespace(tag="evt-1"))
adapter._on_message_event(SimpleNamespace(tag="evt-2"))
adapter._on_message_event(SimpleNamespace(tag="evt-3"))
# All three queued, none dropped.
self.assertEqual(len(adapter._pending_inbound_events), 3)
# Only ONE drainer thread scheduled, not one per event.
self.assertEqual(thread_cls.call_count, 1)
# Drain scheduled flag set.
self.assertTrue(adapter._pending_drain_scheduled)
@patch.dict(os.environ, {}, clear=True)
def test_drainer_replays_queued_events_when_loop_becomes_ready(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter._loop = None
adapter._running = True
class _ReadyLoop:
def is_closed(self):
return False
# Queue three events while loop is None (simulate the race).
events = [SimpleNamespace(tag=f"evt-{i}") for i in range(3)]
with patch("gateway.platforms.feishu.threading.Thread"):
for ev in events:
adapter._on_message_event(ev)
self.assertEqual(len(adapter._pending_inbound_events), 3)
# Now the loop becomes ready; run the drainer inline (not as a thread)
# to verify it replays the queue.
adapter._loop = _ReadyLoop()
future = SimpleNamespace(add_done_callback=lambda *_a, **_kw: None)
submitted: list = []
def _submit(coro, _loop):
submitted.append(coro)
coro.close()
return future
with patch(
"gateway.platforms.feishu.asyncio.run_coroutine_threadsafe",
side_effect=_submit,
) as submit:
adapter._drain_pending_inbound_events()
# All three events dispatched to the loop.
self.assertEqual(submit.call_count, 3)
# Queue emptied.
self.assertEqual(len(adapter._pending_inbound_events), 0)
# Drain flag reset so a future race can schedule a new drainer.
self.assertFalse(adapter._pending_drain_scheduled)
@patch.dict(os.environ, {}, clear=True)
def test_drainer_drops_queue_when_adapter_shuts_down(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter._loop = None
adapter._running = False # Shutdown state
with patch("gateway.platforms.feishu.threading.Thread"):
adapter._on_message_event(SimpleNamespace(tag="evt-lost"))
self.assertEqual(len(adapter._pending_inbound_events), 1)
# Drainer should drop the queue immediately since _running is False.
adapter._drain_pending_inbound_events()
self.assertEqual(len(adapter._pending_inbound_events), 0)
self.assertFalse(adapter._pending_drain_scheduled)
@patch.dict(os.environ, {}, clear=True)
def test_queue_cap_evicts_oldest_beyond_max_depth(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter._loop = None
adapter._pending_inbound_max_depth = 3 # Shrink for test
with patch("gateway.platforms.feishu.threading.Thread"):
for i in range(5):
adapter._on_message_event(SimpleNamespace(tag=f"evt-{i}"))
# Only the last 3 should remain; evt-0 and evt-1 dropped.
self.assertEqual(len(adapter._pending_inbound_events), 3)
tags = [getattr(e, "tag", None) for e in adapter._pending_inbound_events]
self.assertEqual(tags, ["evt-2", "evt-3", "evt-4"])
@patch.dict(os.environ, {}, clear=True)
def test_normal_path_unchanged_when_loop_ready(self):
"""When the loop is ready, events should dispatch directly without
ever touching the pending queue."""
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
class _ReadyLoop:
def is_closed(self):
return False
adapter._loop = _ReadyLoop()
future = SimpleNamespace(add_done_callback=lambda *_a, **_kw: None)
def _submit(coro, _loop):
coro.close()
return future
with patch(
"gateway.platforms.feishu.asyncio.run_coroutine_threadsafe",
side_effect=_submit,
) as submit, patch(
"gateway.platforms.feishu.threading.Thread"
) as thread_cls:
adapter._on_message_event(SimpleNamespace(tag="evt"))
self.assertEqual(submit.call_count, 1)
self.assertEqual(len(adapter._pending_inbound_events), 0)
self.assertFalse(adapter._pending_drain_scheduled)
# No drainer thread spawned when the happy path runs.
self.assertEqual(thread_cls.call_count, 0)
@unittest.skipUnless(_HAS_LARK_OAPI, "lark-oapi not installed")
class TestWebhookSecurity(unittest.TestCase):
"""Tests for webhook signature verification, rate limiting, and body size limits."""