fix(mattermost): preserve thread-local delivery hygiene

Salvage the valid thread-routing pieces from #41640:
- route Mattermost progress/status sends through metadata thread IDs
- treat top-level Mattermost channel posts as thread roots for progress
- preserve thread metadata through media/file sends
- allow flat fallback only for final notify-worthy replies on confirmed broken roots

Co-authored-by: Wolfram Ravenwolf <github.com@wolfram.ravenwolf.de>
This commit is contained in:
Teknium 2026-06-15 14:52:13 -07:00
parent d2b34e89b0
commit 5a0e0d35b9
3 changed files with 286 additions and 28 deletions

View file

@ -402,6 +402,17 @@ async def _send_or_update_status_coro(adapter, chat_id, status_key, content, met
return await adapter.send(chat_id, content, metadata=metadata)
def _resolve_progress_thread_id(platform: Any, source_thread_id: Any, event_message_id: Any) -> Optional[str]:
"""Return thread/root ID that progress/status bubbles should target."""
platform_value = getattr(platform, "value", platform)
platform_key = str(platform_value or "").lower()
if source_thread_id:
return str(source_thread_id)
if platform_key in {"slack", "mattermost"} and event_message_id:
return str(event_message_id)
return None
def _telegramize_command_mentions(text: str, platform: Any) -> str:
"""Rewrite slash-command mentions to Telegram-valid command names.
@ -13884,10 +13895,9 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
# - Feishu only honors reply_in_thread when sending a reply, so topic
# progress uses the triggering event message as the reply target
# - Other platforms should use explicit source.thread_id only
if source.platform == Platform.SLACK:
_progress_thread_id = source.thread_id or event_message_id
else:
_progress_thread_id = source.thread_id
_progress_thread_id = _resolve_progress_thread_id(
source.platform, source.thread_id, event_message_id,
)
_progress_metadata = (
self._thread_metadata_for_source(source, event_message_id)
if _progress_thread_id == source.thread_id

View file

@ -96,6 +96,9 @@ class MattermostAdapter(BasePlatformAdapter):
or os.getenv("MATTERMOST_REPLY_MODE", "off")
).lower()
self._last_post_status: Optional[int] = None
self._last_post_error: str = ""
# Dedup cache (prevent reprocessing)
self._dedup = MessageDeduplicator()
@ -130,20 +133,79 @@ class MattermostAdapter(BasePlatformAdapter):
"""POST /api/v4/{path} with JSON body."""
import aiohttp
url = f"{self._base_url}/api/v4/{path.lstrip('/')}"
self._last_post_status = None
self._last_post_error = ""
try:
async with self._session.post(
url, headers=self._headers(), json=payload,
timeout=aiohttp.ClientTimeout(total=30)
) as resp:
self._last_post_status = resp.status
if resp.status >= 400:
body = await resp.text()
self._last_post_error = body or ""
logger.error("MM API POST %s%s: %s", path, resp.status, body[:200])
return {}
return await resp.json()
except aiohttp.ClientError as exc:
self._last_post_error = str(exc)
logger.error("MM API POST %s network error: %s", path, exc)
return {}
async def _thread_root_for_send(
self,
reply_to: Optional[str],
metadata: Optional[Dict[str, Any]],
) -> Optional[str]:
"""Resolve the Mattermost root_id from reply_to or metadata."""
if self._reply_mode != "thread":
return None
candidate = reply_to
if not candidate and isinstance(metadata, dict):
candidate = metadata.get("thread_id") or metadata.get("root_id")
if not candidate:
return None
return await self._resolve_root_id(str(candidate))
def _last_post_failure_is_broken_thread_root(self) -> bool:
"""Return True only for clear invalid/missing Mattermost thread roots."""
if self._last_post_status not in {400, 404}:
return False
body = (self._last_post_error or "").lower()
if not body:
return False
rootish = any(marker in body for marker in ("root_id", "rootid", "root id", "thread", "post"))
broken = any(marker in body for marker in ("invalid", "not found", "does not exist", "missing"))
return rootish and broken
async def _post_preserving_thread(
self,
chat_id: str,
payload: Dict[str, Any],
metadata: Optional[Dict[str, Any]],
) -> Dict[str, Any]:
"""Post once, optionally falling back flat for final notify content."""
data = await self._api_post("posts", payload)
if data or "root_id" not in payload:
return data
if not (isinstance(metadata, dict) and metadata.get("notify")):
return data
if not self._last_post_failure_is_broken_thread_root():
return data
flat_payload = dict(payload)
flat_payload.pop("root_id", None)
original = str(flat_payload.get("message") or "")
flat_payload["message"] = (
"⚠️ Mattermost thread delivery failed; posting final reply in channel.\n\n"
+ original
).strip()
logger.warning(
"Mattermost: falling back to flat channel delivery for notify-worthy post in %s",
chat_id,
)
return await self._api_post("posts", flat_payload)
async def _api_put(
self, path: str, payload: Dict[str, Any]
) -> Dict[str, Any]:
@ -286,14 +348,12 @@ class MattermostAdapter(BasePlatformAdapter):
"channel_id": chat_id,
"message": chunk,
}
# Thread support: reply_to is the root post ID.
if reply_to and self._reply_mode == "thread":
# Ensure root_id points to the thread root, not a reply.
# Mattermost rejects non-root post IDs as root_id.
resolved_root = await self._resolve_root_id(reply_to)
# Thread support: reply_to or metadata["thread_id"] is the root post ID.
resolved_root = await self._thread_root_for_send(reply_to, metadata)
if resolved_root:
payload["root_id"] = resolved_root
data = await self._api_post("posts", payload)
data = await self._post_preserving_thread(chat_id, payload, metadata)
if not data or "id" not in data:
return SendResult(success=False, error="Failed to create post")
last_id = data["id"]
@ -346,7 +406,7 @@ class MattermostAdapter(BasePlatformAdapter):
) -> SendResult:
"""Download an image and upload it as a file attachment."""
return await self._send_url_as_file(
chat_id, image_url, caption, reply_to, "image"
chat_id, image_url, caption, reply_to, "image", metadata
)
async def send_image_file(
@ -359,7 +419,7 @@ class MattermostAdapter(BasePlatformAdapter):
) -> SendResult:
"""Upload a local image file."""
return await self._send_local_file(
chat_id, image_path, caption, reply_to
chat_id, image_path, caption, reply_to, metadata=metadata
)
async def send_document(
@ -373,7 +433,7 @@ class MattermostAdapter(BasePlatformAdapter):
) -> SendResult:
"""Upload a local file as a document."""
return await self._send_local_file(
chat_id, file_path, caption, reply_to, file_name
chat_id, file_path, caption, reply_to, file_name, metadata
)
async def send_voice(
@ -386,7 +446,7 @@ class MattermostAdapter(BasePlatformAdapter):
) -> SendResult:
"""Upload an audio file."""
return await self._send_local_file(
chat_id, audio_path, caption, reply_to
chat_id, audio_path, caption, reply_to, metadata=metadata
)
async def send_video(
@ -399,7 +459,7 @@ class MattermostAdapter(BasePlatformAdapter):
) -> SendResult:
"""Upload a video file."""
return await self._send_local_file(
chat_id, video_path, caption, reply_to
chat_id, video_path, caption, reply_to, metadata=metadata
)
def format_message(self, content: str) -> str:
@ -423,12 +483,13 @@ class MattermostAdapter(BasePlatformAdapter):
caption: Optional[str],
reply_to: Optional[str],
kind: str = "file",
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Download a URL and upload it as a file attachment."""
from tools.url_safety import is_safe_url
if not is_safe_url(url):
logger.warning("Mattermost: blocked unsafe URL (SSRF protection)")
return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to)
return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to, metadata=metadata)
import aiohttp
@ -446,7 +507,7 @@ class MattermostAdapter(BasePlatformAdapter):
await asyncio.sleep(1.5 * (attempt + 1))
continue
if resp.status >= 400:
return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to)
return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to, metadata=metadata)
file_data = await resp.read()
ct = resp.content_type or "application/octet-stream"
break
@ -455,25 +516,26 @@ class MattermostAdapter(BasePlatformAdapter):
await asyncio.sleep(1.5 * (attempt + 1))
continue
logger.warning("Mattermost: failed to download %s after %d attempts: %s", url, attempt + 1, exc)
return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to)
return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to, metadata=metadata)
if file_data is None:
logger.warning("Mattermost: download returned no data for %s", url)
return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to)
return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to, metadata=metadata)
file_id = await self._upload_file(chat_id, file_data, fname, ct)
if not file_id:
return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to)
return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to, metadata=metadata)
payload: Dict[str, Any] = {
"channel_id": chat_id,
"message": caption or "",
"file_ids": [file_id],
}
if reply_to and self._reply_mode == "thread":
payload["root_id"] = await self._resolve_root_id(reply_to)
resolved_root = await self._thread_root_for_send(reply_to, metadata)
if resolved_root:
payload["root_id"] = resolved_root
data = await self._api_post("posts", payload)
data = await self._post_preserving_thread(chat_id, payload, metadata)
if not data or "id" not in data:
return SendResult(success=False, error="Failed to post with file")
return SendResult(success=True, message_id=data["id"])
@ -485,6 +547,7 @@ class MattermostAdapter(BasePlatformAdapter):
caption: Optional[str],
reply_to: Optional[str],
file_name: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Upload a local file and attach it to a post."""
import mimetypes
@ -509,10 +572,11 @@ class MattermostAdapter(BasePlatformAdapter):
"message": caption or "",
"file_ids": [file_id],
}
if reply_to and self._reply_mode == "thread":
payload["root_id"] = await self._resolve_root_id(reply_to)
resolved_root = await self._thread_root_for_send(reply_to, metadata)
if resolved_root:
payload["root_id"] = resolved_root
data = await self._api_post("posts", payload)
data = await self._post_preserving_thread(chat_id, payload, metadata)
if not data or "id" not in data:
return SendResult(success=False, error="Failed to post with file")
return SendResult(success=True, message_id=data["id"])
@ -596,11 +660,14 @@ class MattermostAdapter(BasePlatformAdapter):
"message": "\n".join(caption_parts),
"file_ids": file_ids,
}
resolved_root = await self._thread_root_for_send(None, metadata)
if resolved_root:
payload["root_id"] = resolved_root
logger.info(
"Mattermost: sending %d image(s) as single post (chunk %d/%d)",
len(file_ids), chunk_idx + 1, len(chunks),
)
data = await self._api_post("posts", payload)
data = await self._post_preserving_thread(chat_id, payload, metadata)
if not data or "id" not in data:
logger.warning("Mattermost: multi-image post failed, falling back")
await super().send_multiple_images(chat_id, chunk, metadata, human_delay=human_delay)
@ -786,8 +853,16 @@ class MattermostAdapter(BasePlatformAdapter):
sender_id = post.get("user_id", "")
sender_name = data.get("sender_name", "").lstrip("@") or sender_id
# Thread support: if the post is in a thread, use root_id.
# Thread support: if the post is in a thread, use root_id. In
# thread mode, top-level channel posts are valid roots for progress.
thread_id = post.get("root_id") or None
if (
not thread_id
and self._reply_mode == "thread"
and channel_type_raw != "D"
and post_id
):
thread_id = post_id
# Determine message type.
file_ids = post.get("file_ids") or []
@ -849,6 +924,7 @@ class MattermostAdapter(BasePlatformAdapter):
user_id=sender_id,
user_name=sender_name,
thread_id=thread_id,
message_id=post_id,
)
# Per-channel ephemeral prompt

View file

@ -6,6 +6,30 @@ import pytest
from unittest.mock import MagicMock, patch, AsyncMock
from gateway.config import Platform, PlatformConfig
from gateway.run import _resolve_progress_thread_id
class TestMattermostProgressThreadRouting:
def test_top_level_mattermost_progress_uses_event_message_id(self):
assert _resolve_progress_thread_id(
Platform.MATTERMOST,
source_thread_id=None,
event_message_id="top_post_123",
) == "top_post_123"
def test_threaded_mattermost_progress_prefers_existing_thread_root(self):
assert _resolve_progress_thread_id(
Platform.MATTERMOST,
source_thread_id="root_post_123",
event_message_id="reply_post_456",
) == "root_post_123"
def test_telegram_progress_does_not_use_message_id_as_thread_id(self):
assert _resolve_progress_thread_id(
Platform.TELEGRAM,
source_thread_id=None,
event_message_id="12345",
) is None
# ---------------------------------------------------------------------------
@ -237,6 +261,92 @@ class TestMattermostSend:
payload = self.adapter._session.post.call_args[1]["json"]
assert "root_id" not in payload
@pytest.mark.asyncio
async def test_send_uses_metadata_thread_id_for_progress_messages(self):
"""Progress/status messages pass Mattermost thread context via metadata."""
self.adapter._reply_mode = "thread"
self.adapter._api_get = AsyncMock(return_value={"id": "root_post_123", "root_id": ""})
self.adapter._api_post = AsyncMock(return_value={"id": "progress_post"})
result = await self.adapter.send(
"channel_1",
"⚡ terminal...",
metadata={"thread_id": "root_post_123"},
)
assert result.success is True
payload = self.adapter._api_post.call_args_list[0][0][1]
assert payload["root_id"] == "root_post_123"
@pytest.mark.asyncio
async def test_progress_send_with_invalid_thread_root_never_falls_back_flat(self):
"""Tool/status/progress bubbles must stay quiet when the thread is broken."""
self.adapter._reply_mode = "thread"
self.adapter._api_get = AsyncMock(return_value={"id": "bad_root", "root_id": ""})
self.adapter._last_post_status = 400
self.adapter._last_post_error = "api.context.invalid_param.app_error: invalid root_id"
self.adapter._api_post = AsyncMock(return_value={})
result = await self.adapter.send(
"channel_1",
"⚙️ terminal...",
metadata={"thread_id": "bad_root"},
)
assert result.success is False
assert self.adapter._api_post.call_count == 1
payload = self.adapter._api_post.call_args_list[0][0][1]
assert payload["root_id"] == "bad_root"
@pytest.mark.asyncio
async def test_notify_send_with_invalid_thread_root_falls_back_flat_with_warning(self):
"""Notify-worthy replies may fall back flat so the answer is not lost."""
self.adapter._reply_mode = "thread"
self.adapter._api_get = AsyncMock(return_value={"id": "bad_root", "root_id": ""})
self.adapter._last_post_status = 400
self.adapter._last_post_error = "api.context.invalid_param.app_error: invalid root_id"
self.adapter._api_post = AsyncMock(side_effect=[{}, {"id": "flat_final"}])
result = await self.adapter.send(
"channel_1",
"Final answer body",
reply_to="bad_root",
metadata={"notify": True},
)
assert result.success is True
assert result.message_id == "flat_final"
assert self.adapter._api_post.call_count == 2
threaded_payload = self.adapter._api_post.call_args_list[0][0][1]
flat_payload = self.adapter._api_post.call_args_list[1][0][1]
assert threaded_payload["root_id"] == "bad_root"
assert "root_id" not in flat_payload
assert flat_payload["channel_id"] == "channel_1"
assert "Mattermost thread delivery failed" in flat_payload["message"]
assert "Final answer body" in flat_payload["message"]
@pytest.mark.asyncio
async def test_notify_send_with_server_error_does_not_fall_back_flat(self):
"""Notify fallback is only for broken thread roots, not generic API failures."""
self.adapter._reply_mode = "thread"
self.adapter._api_get = AsyncMock(return_value={"id": "root_post", "root_id": ""})
self.adapter._last_post_status = 500
self.adapter._last_post_error = "Internal Server Error"
self.adapter._api_post = AsyncMock(return_value={})
result = await self.adapter.send(
"channel_1",
"Final answer body",
reply_to="root_post",
metadata={"notify": True},
)
assert result.success is False
assert self.adapter._api_post.call_count == 1
payload = self.adapter._api_post.call_args_list[0][0][1]
assert payload["root_id"] == "root_post"
@pytest.mark.asyncio
async def test_send_api_failure(self):
"""When API returns error, send should return failure."""
@ -750,3 +860,65 @@ class TestMattermostMediaTypes:
assert msg.media_types == ["application/pdf"]
assert not msg.media_types[0].startswith("image/")
assert not msg.media_types[0].startswith("audio/")
@pytest.mark.asyncio
async def test_mattermost_top_level_channel_post_is_thread_root():
adapter = _make_adapter()
adapter._reply_mode = "thread"
adapter._bot_user_id = "bot_user_id"
adapter._bot_username = "hermes-bot"
adapter.handle_message = AsyncMock()
post_data = {
"id": "top_post_123",
"user_id": "user_123",
"channel_id": "chan_456",
"message": "@hermes-bot start work",
"root_id": "",
}
event = {
"event": "posted",
"data": {
"post": json.dumps(post_data),
"channel_type": "O",
"sender_name": "@alice",
},
}
await adapter._handle_ws_event(event)
msg_event = adapter.handle_message.call_args[0][0]
assert msg_event.source.thread_id == "top_post_123"
assert msg_event.source.message_id == "top_post_123"
assert msg_event.message_id == "top_post_123"
@pytest.mark.asyncio
async def test_mattermost_dm_post_does_not_seed_thread_root():
adapter = _make_adapter()
adapter._reply_mode = "thread"
adapter._bot_user_id = "bot_user_id"
adapter._bot_username = "hermes-bot"
adapter.handle_message = AsyncMock()
post_data = {
"id": "dm_post_123",
"user_id": "user_123",
"channel_id": "dm_chan",
"message": "hello",
"root_id": "",
}
event = {
"event": "posted",
"data": {
"post": json.dumps(post_data),
"channel_type": "D",
"sender_name": "@alice",
},
}
await adapter._handle_ws_event(event)
msg_event = adapter.handle_message.call_args[0][0]
assert msg_event.source.thread_id is None
assert msg_event.source.message_id == "dm_post_123"