diff --git a/gateway/run.py b/gateway/run.py index 1650851fb75..1c29a593e3c 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -402,6 +402,17 @@ async def _send_or_update_status_coro(adapter, chat_id, status_key, content, met return await adapter.send(chat_id, content, metadata=metadata) +def _resolve_progress_thread_id(platform: Any, source_thread_id: Any, event_message_id: Any) -> Optional[str]: + """Return thread/root ID that progress/status bubbles should target.""" + platform_value = getattr(platform, "value", platform) + platform_key = str(platform_value or "").lower() + if source_thread_id: + return str(source_thread_id) + if platform_key in {"slack", "mattermost"} and event_message_id: + return str(event_message_id) + return None + + def _telegramize_command_mentions(text: str, platform: Any) -> str: """Rewrite slash-command mentions to Telegram-valid command names. @@ -13884,10 +13895,9 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew # - Feishu only honors reply_in_thread when sending a reply, so topic # progress uses the triggering event message as the reply target # - Other platforms should use explicit source.thread_id only - if source.platform == Platform.SLACK: - _progress_thread_id = source.thread_id or event_message_id - else: - _progress_thread_id = source.thread_id + _progress_thread_id = _resolve_progress_thread_id( + source.platform, source.thread_id, event_message_id, + ) _progress_metadata = ( self._thread_metadata_for_source(source, event_message_id) if _progress_thread_id == source.thread_id diff --git a/plugins/platforms/mattermost/adapter.py b/plugins/platforms/mattermost/adapter.py index bb6dc9b81f2..bc2280cb6d2 100644 --- a/plugins/platforms/mattermost/adapter.py +++ b/plugins/platforms/mattermost/adapter.py @@ -96,6 +96,9 @@ class MattermostAdapter(BasePlatformAdapter): or os.getenv("MATTERMOST_REPLY_MODE", "off") ).lower() + self._last_post_status: Optional[int] = None + self._last_post_error: str = "" + # Dedup cache (prevent reprocessing) self._dedup = MessageDeduplicator() @@ -130,20 +133,79 @@ class MattermostAdapter(BasePlatformAdapter): """POST /api/v4/{path} with JSON body.""" import aiohttp url = f"{self._base_url}/api/v4/{path.lstrip('/')}" + self._last_post_status = None + self._last_post_error = "" try: async with self._session.post( url, headers=self._headers(), json=payload, timeout=aiohttp.ClientTimeout(total=30) ) as resp: + self._last_post_status = resp.status if resp.status >= 400: body = await resp.text() + self._last_post_error = body or "" logger.error("MM API POST %s → %s: %s", path, resp.status, body[:200]) return {} return await resp.json() except aiohttp.ClientError as exc: + self._last_post_error = str(exc) logger.error("MM API POST %s network error: %s", path, exc) return {} + async def _thread_root_for_send( + self, + reply_to: Optional[str], + metadata: Optional[Dict[str, Any]], + ) -> Optional[str]: + """Resolve the Mattermost root_id from reply_to or metadata.""" + if self._reply_mode != "thread": + return None + candidate = reply_to + if not candidate and isinstance(metadata, dict): + candidate = metadata.get("thread_id") or metadata.get("root_id") + if not candidate: + return None + return await self._resolve_root_id(str(candidate)) + + def _last_post_failure_is_broken_thread_root(self) -> bool: + """Return True only for clear invalid/missing Mattermost thread roots.""" + if self._last_post_status not in {400, 404}: + return False + body = (self._last_post_error or "").lower() + if not body: + return False + rootish = any(marker in body for marker in ("root_id", "rootid", "root id", "thread", "post")) + broken = any(marker in body for marker in ("invalid", "not found", "does not exist", "missing")) + return rootish and broken + + async def _post_preserving_thread( + self, + chat_id: str, + payload: Dict[str, Any], + metadata: Optional[Dict[str, Any]], + ) -> Dict[str, Any]: + """Post once, optionally falling back flat for final notify content.""" + data = await self._api_post("posts", payload) + if data or "root_id" not in payload: + return data + if not (isinstance(metadata, dict) and metadata.get("notify")): + return data + if not self._last_post_failure_is_broken_thread_root(): + return data + + flat_payload = dict(payload) + flat_payload.pop("root_id", None) + original = str(flat_payload.get("message") or "") + flat_payload["message"] = ( + "⚠️ Mattermost thread delivery failed; posting final reply in channel.\n\n" + + original + ).strip() + logger.warning( + "Mattermost: falling back to flat channel delivery for notify-worthy post in %s", + chat_id, + ) + return await self._api_post("posts", flat_payload) + async def _api_put( self, path: str, payload: Dict[str, Any] ) -> Dict[str, Any]: @@ -286,14 +348,12 @@ class MattermostAdapter(BasePlatformAdapter): "channel_id": chat_id, "message": chunk, } - # Thread support: reply_to is the root post ID. - if reply_to and self._reply_mode == "thread": - # Ensure root_id points to the thread root, not a reply. - # Mattermost rejects non-root post IDs as root_id. - resolved_root = await self._resolve_root_id(reply_to) + # Thread support: reply_to or metadata["thread_id"] is the root post ID. + resolved_root = await self._thread_root_for_send(reply_to, metadata) + if resolved_root: payload["root_id"] = resolved_root - data = await self._api_post("posts", payload) + data = await self._post_preserving_thread(chat_id, payload, metadata) if not data or "id" not in data: return SendResult(success=False, error="Failed to create post") last_id = data["id"] @@ -346,7 +406,7 @@ class MattermostAdapter(BasePlatformAdapter): ) -> SendResult: """Download an image and upload it as a file attachment.""" return await self._send_url_as_file( - chat_id, image_url, caption, reply_to, "image" + chat_id, image_url, caption, reply_to, "image", metadata ) async def send_image_file( @@ -359,7 +419,7 @@ class MattermostAdapter(BasePlatformAdapter): ) -> SendResult: """Upload a local image file.""" return await self._send_local_file( - chat_id, image_path, caption, reply_to + chat_id, image_path, caption, reply_to, metadata=metadata ) async def send_document( @@ -373,7 +433,7 @@ class MattermostAdapter(BasePlatformAdapter): ) -> SendResult: """Upload a local file as a document.""" return await self._send_local_file( - chat_id, file_path, caption, reply_to, file_name + chat_id, file_path, caption, reply_to, file_name, metadata ) async def send_voice( @@ -386,7 +446,7 @@ class MattermostAdapter(BasePlatformAdapter): ) -> SendResult: """Upload an audio file.""" return await self._send_local_file( - chat_id, audio_path, caption, reply_to + chat_id, audio_path, caption, reply_to, metadata=metadata ) async def send_video( @@ -399,7 +459,7 @@ class MattermostAdapter(BasePlatformAdapter): ) -> SendResult: """Upload a video file.""" return await self._send_local_file( - chat_id, video_path, caption, reply_to + chat_id, video_path, caption, reply_to, metadata=metadata ) def format_message(self, content: str) -> str: @@ -423,12 +483,13 @@ class MattermostAdapter(BasePlatformAdapter): caption: Optional[str], reply_to: Optional[str], kind: str = "file", + metadata: Optional[Dict[str, Any]] = None, ) -> SendResult: """Download a URL and upload it as a file attachment.""" from tools.url_safety import is_safe_url if not is_safe_url(url): logger.warning("Mattermost: blocked unsafe URL (SSRF protection)") - return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to) + return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to, metadata=metadata) import aiohttp @@ -446,7 +507,7 @@ class MattermostAdapter(BasePlatformAdapter): await asyncio.sleep(1.5 * (attempt + 1)) continue if resp.status >= 400: - return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to) + return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to, metadata=metadata) file_data = await resp.read() ct = resp.content_type or "application/octet-stream" break @@ -455,25 +516,26 @@ class MattermostAdapter(BasePlatformAdapter): await asyncio.sleep(1.5 * (attempt + 1)) continue logger.warning("Mattermost: failed to download %s after %d attempts: %s", url, attempt + 1, exc) - return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to) + return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to, metadata=metadata) if file_data is None: logger.warning("Mattermost: download returned no data for %s", url) - return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to) + return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to, metadata=metadata) file_id = await self._upload_file(chat_id, file_data, fname, ct) if not file_id: - return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to) + return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to, metadata=metadata) payload: Dict[str, Any] = { "channel_id": chat_id, "message": caption or "", "file_ids": [file_id], } - if reply_to and self._reply_mode == "thread": - payload["root_id"] = await self._resolve_root_id(reply_to) + resolved_root = await self._thread_root_for_send(reply_to, metadata) + if resolved_root: + payload["root_id"] = resolved_root - data = await self._api_post("posts", payload) + data = await self._post_preserving_thread(chat_id, payload, metadata) if not data or "id" not in data: return SendResult(success=False, error="Failed to post with file") return SendResult(success=True, message_id=data["id"]) @@ -485,6 +547,7 @@ class MattermostAdapter(BasePlatformAdapter): caption: Optional[str], reply_to: Optional[str], file_name: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, ) -> SendResult: """Upload a local file and attach it to a post.""" import mimetypes @@ -509,10 +572,11 @@ class MattermostAdapter(BasePlatformAdapter): "message": caption or "", "file_ids": [file_id], } - if reply_to and self._reply_mode == "thread": - payload["root_id"] = await self._resolve_root_id(reply_to) + resolved_root = await self._thread_root_for_send(reply_to, metadata) + if resolved_root: + payload["root_id"] = resolved_root - data = await self._api_post("posts", payload) + data = await self._post_preserving_thread(chat_id, payload, metadata) if not data or "id" not in data: return SendResult(success=False, error="Failed to post with file") return SendResult(success=True, message_id=data["id"]) @@ -596,11 +660,14 @@ class MattermostAdapter(BasePlatformAdapter): "message": "\n".join(caption_parts), "file_ids": file_ids, } + resolved_root = await self._thread_root_for_send(None, metadata) + if resolved_root: + payload["root_id"] = resolved_root logger.info( "Mattermost: sending %d image(s) as single post (chunk %d/%d)", len(file_ids), chunk_idx + 1, len(chunks), ) - data = await self._api_post("posts", payload) + data = await self._post_preserving_thread(chat_id, payload, metadata) if not data or "id" not in data: logger.warning("Mattermost: multi-image post failed, falling back") await super().send_multiple_images(chat_id, chunk, metadata, human_delay=human_delay) @@ -786,8 +853,16 @@ class MattermostAdapter(BasePlatformAdapter): sender_id = post.get("user_id", "") sender_name = data.get("sender_name", "").lstrip("@") or sender_id - # Thread support: if the post is in a thread, use root_id. + # Thread support: if the post is in a thread, use root_id. In + # thread mode, top-level channel posts are valid roots for progress. thread_id = post.get("root_id") or None + if ( + not thread_id + and self._reply_mode == "thread" + and channel_type_raw != "D" + and post_id + ): + thread_id = post_id # Determine message type. file_ids = post.get("file_ids") or [] @@ -849,6 +924,7 @@ class MattermostAdapter(BasePlatformAdapter): user_id=sender_id, user_name=sender_name, thread_id=thread_id, + message_id=post_id, ) # Per-channel ephemeral prompt diff --git a/tests/gateway/test_mattermost.py b/tests/gateway/test_mattermost.py index cafe5ad68a4..9b174a5137a 100644 --- a/tests/gateway/test_mattermost.py +++ b/tests/gateway/test_mattermost.py @@ -6,6 +6,30 @@ import pytest from unittest.mock import MagicMock, patch, AsyncMock from gateway.config import Platform, PlatformConfig +from gateway.run import _resolve_progress_thread_id + + +class TestMattermostProgressThreadRouting: + def test_top_level_mattermost_progress_uses_event_message_id(self): + assert _resolve_progress_thread_id( + Platform.MATTERMOST, + source_thread_id=None, + event_message_id="top_post_123", + ) == "top_post_123" + + def test_threaded_mattermost_progress_prefers_existing_thread_root(self): + assert _resolve_progress_thread_id( + Platform.MATTERMOST, + source_thread_id="root_post_123", + event_message_id="reply_post_456", + ) == "root_post_123" + + def test_telegram_progress_does_not_use_message_id_as_thread_id(self): + assert _resolve_progress_thread_id( + Platform.TELEGRAM, + source_thread_id=None, + event_message_id="12345", + ) is None # --------------------------------------------------------------------------- @@ -237,6 +261,92 @@ class TestMattermostSend: payload = self.adapter._session.post.call_args[1]["json"] assert "root_id" not in payload + + @pytest.mark.asyncio + async def test_send_uses_metadata_thread_id_for_progress_messages(self): + """Progress/status messages pass Mattermost thread context via metadata.""" + self.adapter._reply_mode = "thread" + self.adapter._api_get = AsyncMock(return_value={"id": "root_post_123", "root_id": ""}) + self.adapter._api_post = AsyncMock(return_value={"id": "progress_post"}) + + result = await self.adapter.send( + "channel_1", + "⚡ terminal...", + metadata={"thread_id": "root_post_123"}, + ) + + assert result.success is True + payload = self.adapter._api_post.call_args_list[0][0][1] + assert payload["root_id"] == "root_post_123" + + @pytest.mark.asyncio + async def test_progress_send_with_invalid_thread_root_never_falls_back_flat(self): + """Tool/status/progress bubbles must stay quiet when the thread is broken.""" + self.adapter._reply_mode = "thread" + self.adapter._api_get = AsyncMock(return_value={"id": "bad_root", "root_id": ""}) + self.adapter._last_post_status = 400 + self.adapter._last_post_error = "api.context.invalid_param.app_error: invalid root_id" + self.adapter._api_post = AsyncMock(return_value={}) + + result = await self.adapter.send( + "channel_1", + "⚙️ terminal...", + metadata={"thread_id": "bad_root"}, + ) + + assert result.success is False + assert self.adapter._api_post.call_count == 1 + payload = self.adapter._api_post.call_args_list[0][0][1] + assert payload["root_id"] == "bad_root" + + @pytest.mark.asyncio + async def test_notify_send_with_invalid_thread_root_falls_back_flat_with_warning(self): + """Notify-worthy replies may fall back flat so the answer is not lost.""" + self.adapter._reply_mode = "thread" + self.adapter._api_get = AsyncMock(return_value={"id": "bad_root", "root_id": ""}) + self.adapter._last_post_status = 400 + self.adapter._last_post_error = "api.context.invalid_param.app_error: invalid root_id" + self.adapter._api_post = AsyncMock(side_effect=[{}, {"id": "flat_final"}]) + + result = await self.adapter.send( + "channel_1", + "Final answer body", + reply_to="bad_root", + metadata={"notify": True}, + ) + + assert result.success is True + assert result.message_id == "flat_final" + assert self.adapter._api_post.call_count == 2 + threaded_payload = self.adapter._api_post.call_args_list[0][0][1] + flat_payload = self.adapter._api_post.call_args_list[1][0][1] + assert threaded_payload["root_id"] == "bad_root" + assert "root_id" not in flat_payload + assert flat_payload["channel_id"] == "channel_1" + assert "Mattermost thread delivery failed" in flat_payload["message"] + assert "Final answer body" in flat_payload["message"] + + @pytest.mark.asyncio + async def test_notify_send_with_server_error_does_not_fall_back_flat(self): + """Notify fallback is only for broken thread roots, not generic API failures.""" + self.adapter._reply_mode = "thread" + self.adapter._api_get = AsyncMock(return_value={"id": "root_post", "root_id": ""}) + self.adapter._last_post_status = 500 + self.adapter._last_post_error = "Internal Server Error" + self.adapter._api_post = AsyncMock(return_value={}) + + result = await self.adapter.send( + "channel_1", + "Final answer body", + reply_to="root_post", + metadata={"notify": True}, + ) + + assert result.success is False + assert self.adapter._api_post.call_count == 1 + payload = self.adapter._api_post.call_args_list[0][0][1] + assert payload["root_id"] == "root_post" + @pytest.mark.asyncio async def test_send_api_failure(self): """When API returns error, send should return failure.""" @@ -750,3 +860,65 @@ class TestMattermostMediaTypes: assert msg.media_types == ["application/pdf"] assert not msg.media_types[0].startswith("image/") assert not msg.media_types[0].startswith("audio/") + + + +@pytest.mark.asyncio +async def test_mattermost_top_level_channel_post_is_thread_root(): + adapter = _make_adapter() + adapter._reply_mode = "thread" + adapter._bot_user_id = "bot_user_id" + adapter._bot_username = "hermes-bot" + adapter.handle_message = AsyncMock() + post_data = { + "id": "top_post_123", + "user_id": "user_123", + "channel_id": "chan_456", + "message": "@hermes-bot start work", + "root_id": "", + } + event = { + "event": "posted", + "data": { + "post": json.dumps(post_data), + "channel_type": "O", + "sender_name": "@alice", + }, + } + + await adapter._handle_ws_event(event) + + msg_event = adapter.handle_message.call_args[0][0] + assert msg_event.source.thread_id == "top_post_123" + assert msg_event.source.message_id == "top_post_123" + assert msg_event.message_id == "top_post_123" + + +@pytest.mark.asyncio +async def test_mattermost_dm_post_does_not_seed_thread_root(): + adapter = _make_adapter() + adapter._reply_mode = "thread" + adapter._bot_user_id = "bot_user_id" + adapter._bot_username = "hermes-bot" + adapter.handle_message = AsyncMock() + post_data = { + "id": "dm_post_123", + "user_id": "user_123", + "channel_id": "dm_chan", + "message": "hello", + "root_id": "", + } + event = { + "event": "posted", + "data": { + "post": json.dumps(post_data), + "channel_type": "D", + "sender_name": "@alice", + }, + } + + await adapter._handle_ws_event(event) + + msg_event = adapter.handle_message.call_args[0][0] + assert msg_event.source.thread_id is None + assert msg_event.source.message_id == "dm_post_123"