diff --git a/gateway/platforms/weixin.py b/gateway/platforms/weixin.py index adb6d21a0e0..86358392c20 100644 --- a/gateway/platforms/weixin.py +++ b/gateway/platforms/weixin.py @@ -1810,10 +1810,47 @@ class WeixinAdapter(BasePlatformAdapter): logger.error("[%s] send failed to=%s: %s", self.name, _safe_id(chat_id), exc) return SendResult(success=False, error=str(exc)) + async def _ensure_typing_ticket(self, chat_id: str) -> Optional[str]: + """Return a valid typing ticket, refreshing from getConfig if expired. + + The iLink typing ticket has a 600-second TTL. When a long-running + session exceeds that window the cached ticket evicts, and both + ``send_typing`` and ``stop_typing`` silently no-op — leaving the + WeChat client stuck showing the typing indicator forever. This + method transparently refreshes the ticket so the stop signal can + always be delivered. + """ + ticket = self._typing_cache.get(chat_id) + if ticket: + return ticket + if not self._send_session or not self._token: + return None + # Ticket expired or never fetched — refresh via getConfig. + # Use the most recent context_token for this peer if available. + context_token = self._token_store.get(self._account_id, chat_id) + try: + response = await _get_config( + self._send_session, + base_url=self._base_url, + token=self._token, + user_id=chat_id, + context_token=context_token, + ) + typing_ticket = str(response.get("typing_ticket") or "") + if typing_ticket: + self._typing_cache.set(chat_id, typing_ticket) + return typing_ticket + except Exception as exc: + logger.debug( + "[%s] typing ticket refresh failed for %s: %s", + self.name, _safe_id(chat_id), exc, + ) + return None + async def send_typing(self, chat_id: str, metadata: Optional[Dict[str, Any]] = None) -> None: if not self._send_session or not self._token: return - typing_ticket = self._typing_cache.get(chat_id) + typing_ticket = await self._ensure_typing_ticket(chat_id) if not typing_ticket: return try: @@ -1831,7 +1868,7 @@ class WeixinAdapter(BasePlatformAdapter): async def stop_typing(self, chat_id: str) -> None: if not self._send_session or not self._token: return - typing_ticket = self._typing_cache.get(chat_id) + typing_ticket = await self._ensure_typing_ticket(chat_id) if not typing_ticket: return try: diff --git a/tests/gateway/test_weixin_typing.py b/tests/gateway/test_weixin_typing.py new file mode 100644 index 00000000000..146b3cbd708 --- /dev/null +++ b/tests/gateway/test_weixin_typing.py @@ -0,0 +1,190 @@ +"""Tests for WeChat iLink typing ticket refresh logic (issue #38085).""" + +import asyncio +import time +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + + +@pytest.fixture +def weixin_adapter(): + """Create a minimal WeixinAdapter with mocked internals for typing tests.""" + from gateway.platforms.weixin import WeixinAdapter, TypingTicketCache + + config = MagicMock() + config.extra = {"account_id": "test-account"} + config.name = "weixin" + + with patch.object(WeixinAdapter, "__init__", lambda self, cfg: None): + adapter = WeixinAdapter.__new__(WeixinAdapter) + adapter._send_session = AsyncMock() + adapter._token = "test-token" + adapter._base_url = "https://ilinkai.weixin.qq.com" + adapter._account_id = "test-account" + adapter._typing_cache = TypingTicketCache(ttl_seconds=600.0) + adapter._token_store = MagicMock() + adapter._token_store.get.return_value = None # no stored context_token + adapter.platform = MagicMock() + mock_value = MagicMock() + mock_value.title.return_value = "Weixin" + adapter.platform.value = mock_value + + return adapter + + +class TestEnsureTypingTicket: + """Tests for _ensure_typing_ticket — the fix for stuck typing indicator.""" + + @pytest.mark.asyncio + async def test_returns_cached_ticket_when_fresh(self, weixin_adapter): + """If the cached ticket is still valid, return it without refreshing.""" + weixin_adapter._typing_cache.set("user-123", "cached-ticket-abc") + ticket = await weixin_adapter._ensure_typing_ticket("user-123") + assert ticket == "cached-ticket-abc" + + @pytest.mark.asyncio + async def test_refreshes_when_ticket_expired(self, weixin_adapter): + """When the cached ticket has expired, fetch a new one via getConfig.""" + # Insert an expired ticket directly (bypass TTL check) + weixin_adapter._typing_cache._cache["user-123"] = ( + "old-ticket", + time.time() - 601, # expired (TTL is 600s) + ) + + mock_response = {"typing_ticket": "fresh-ticket-xyz"} + with patch("gateway.platforms.weixin._get_config", new_callable=AsyncMock) as mock_get: + mock_get.return_value = mock_response + ticket = await weixin_adapter._ensure_typing_ticket("user-123") + + assert ticket == "fresh-ticket-xyz" + mock_get.assert_called_once_with( + weixin_adapter._send_session, + base_url=weixin_adapter._base_url, + token=weixin_adapter._token, + user_id="user-123", + context_token=None, + ) + + @pytest.mark.asyncio + async def test_refreshes_when_no_cached_ticket(self, weixin_adapter): + """When there is no cached ticket at all, fetch a new one.""" + mock_response = {"typing_ticket": "new-ticket"} + with patch("gateway.platforms.weixin._get_config", new_callable=AsyncMock) as mock_get: + mock_get.return_value = mock_response + ticket = await weixin_adapter._ensure_typing_ticket("user-456") + + assert ticket == "new-ticket" + + @pytest.mark.asyncio + async def test_uses_stored_context_token_when_available(self, weixin_adapter): + """Pass the stored context_token to getConfig when available.""" + weixin_adapter._token_store.get.return_value = "stored-ctx-token" + + mock_response = {"typing_ticket": "ticket-with-ctx"} + with patch("gateway.platforms.weixin._get_config", new_callable=AsyncMock) as mock_get: + mock_get.return_value = mock_response + ticket = await weixin_adapter._ensure_typing_ticket("user-789") + + assert ticket == "ticket-with-ctx" + mock_get.assert_called_once_with( + weixin_adapter._send_session, + base_url=weixin_adapter._base_url, + token=weixin_adapter._token, + user_id="user-789", + context_token="stored-ctx-token", + ) + + @pytest.mark.asyncio + async def test_returns_none_when_no_session(self, weixin_adapter): + """Return None when there is no send session.""" + weixin_adapter._send_session = None + ticket = await weixin_adapter._ensure_typing_ticket("user-123") + assert ticket is None + + @pytest.mark.asyncio + async def test_returns_none_when_getconfig_fails(self, weixin_adapter): + """Return None when getConfig raises an exception.""" + with patch("gateway.platforms.weixin._get_config", new_callable=AsyncMock) as mock_get: + mock_get.side_effect = Exception("network error") + ticket = await weixin_adapter._ensure_typing_ticket("user-123") + + assert ticket is None + + @pytest.mark.asyncio + async def test_returns_none_when_getconfig_returns_empty_ticket(self, weixin_adapter): + """Return None when getConfig returns no typing_ticket.""" + with patch("gateway.platforms.weixin._get_config", new_callable=AsyncMock) as mock_get: + mock_get.return_value = {"typing_ticket": ""} + ticket = await weixin_adapter._ensure_typing_ticket("user-123") + + assert ticket is None + + @pytest.mark.asyncio + async def test_stop_typing_refreshes_ticket(self, weixin_adapter): + """stop_typing should refresh the ticket when expired, not silently no-op.""" + # Expired ticket + weixin_adapter._typing_cache._cache["user-123"] = ( + "old-ticket", + time.time() - 601, + ) + + mock_response = {"typing_ticket": "refreshed-ticket"} + with patch("gateway.platforms.weixin._get_config", new_callable=AsyncMock) as mock_get, \ + patch("gateway.platforms.weixin._send_typing", new_callable=AsyncMock) as mock_send: + mock_get.return_value = mock_response + await weixin_adapter.stop_typing("user-123") + + # _send_typing should have been called with TYPING_STOP=2 + mock_send.assert_called_once() + call_kwargs = mock_send.call_args + assert call_kwargs.kwargs["typing_ticket"] == "refreshed-ticket" + assert call_kwargs.kwargs["status"] == 2 # TYPING_STOP + + @pytest.mark.asyncio + async def test_send_typing_refreshes_ticket(self, weixin_adapter): + """send_typing should refresh the ticket when expired.""" + # Expired ticket + weixin_adapter._typing_cache._cache["user-123"] = ( + "old-ticket", + time.time() - 601, + ) + + mock_response = {"typing_ticket": "refreshed-ticket"} + with patch("gateway.platforms.weixin._get_config", new_callable=AsyncMock) as mock_get, \ + patch("gateway.platforms.weixin._send_typing", new_callable=AsyncMock) as mock_send: + mock_get.return_value = mock_response + await weixin_adapter.send_typing("user-123") + + mock_send.assert_called_once() + call_kwargs = mock_send.call_args + assert call_kwargs.kwargs["typing_ticket"] == "refreshed-ticket" + assert call_kwargs.kwargs["status"] == 1 # TYPING_START + + +class TestTypingTicketCache: + """Tests for the TypingTicketCache TTL logic.""" + + def test_returns_ticket_when_fresh(self): + from gateway.platforms.weixin import TypingTicketCache + cache = TypingTicketCache(ttl_seconds=600.0) + cache.set("user-1", "ticket-1") + assert cache.get("user-1") == "ticket-1" + + def test_returns_none_when_expired(self): + from gateway.platforms.weixin import TypingTicketCache + cache = TypingTicketCache(ttl_seconds=600.0) + cache._cache["user-1"] = ("ticket-1", time.time() - 601) + assert cache.get("user-1") is None + + def test_returns_none_when_missing(self): + from gateway.platforms.weixin import TypingTicketCache + cache = TypingTicketCache(ttl_seconds=600.0) + assert cache.get("nonexistent") is None + + def test_expired_entry_is_removed_from_cache(self): + from gateway.platforms.weixin import TypingTicketCache + cache = TypingTicketCache(ttl_seconds=600.0) + cache._cache["user-1"] = ("ticket-1", time.time() - 601) + cache.get("user-1") + assert "user-1" not in cache._cache