fix(mattermost): harden delivery hygiene

PROBLEM: Mattermost threads can become invalid or enormous, exposing two failure modes: internal scratch/reasoning/commentary displays could leak into persistent Mattermost threads via global display toggles, while rejected threaded user-visible replies could disappear unless every failed send fell back flat. A broad flat fallback would pollute channels with tool/status/progress noise.

SOLUTION: Require explicit Mattermost platform opt-in for scratch displays, keep using the existing notify=True metadata marker for user-visible final text/media/file replies, and allow the Mattermost plugin adapter to flat-fallback only notify-worthy sends whose threaded POST failure looks like a broken root/thread. Keep tool/status/progress and other non-notify sends thread-strict. Add regression tests for display opt-in, notify-only broken-thread fallback, generic API failure suppression, and stream notify metadata.

Verification: tests/gateway/test_mattermost.py tests/gateway/test_stream_consumer.py tests/gateway/test_stream_consumer_thread_routing.py tests/gateway/test_stream_consumer_fresh_final.py tests/gateway/test_stream_consumer_draft.py; tests/gateway/test_session_api.py tests/gateway/test_status_command.py tests/gateway/test_resume_command.py tests/hermes_cli/test_commands.py; py_compile touched gateway files; git diff --check.

Session: Mattermost thread 6qg8e9dd1pd9pkhi74xyaa1mry, 2026-06-01.
This commit is contained in:
Wolfram Ravenwolf 2026-06-01 04:09:59 +02:00 committed by Teknium
parent 925b0d1ab5
commit 16fc717091
7 changed files with 329 additions and 53 deletions

View file

@ -77,6 +77,13 @@ def _thread_metadata_for_source(source, reply_to_message_id: str | None = None)
return metadata
def _mark_notify_metadata(metadata: dict | None) -> dict:
"""Clone metadata and mark a user-visible reply as notify-worthy."""
notify_metadata = dict(metadata) if metadata else {}
notify_metadata["notify"] = True
return notify_metadata
def _reply_anchor_for_event(event) -> str | None:
"""Return reply_to id for platforms that need reply semantics.
@ -3889,7 +3896,7 @@ class BasePlatformAdapter(ABC):
chat_id=event.source.chat_id,
content=_text,
reply_to=_reply_anchor_for_event(event),
metadata=thread_meta,
metadata=_mark_notify_metadata(thread_meta),
)
if _eph_ttl > 0 and _r.success and _r.message_id:
self._schedule_ephemeral_delete(
@ -3995,7 +4002,7 @@ class BasePlatformAdapter(ABC):
chat_id=event.source.chat_id,
content=_text,
reply_to=_reply_anchor_for_event(event),
metadata=_thread_meta,
metadata=_mark_notify_metadata(_thread_meta),
)
if _eph_ttl > 0 and _r.success and _r.message_id:
self._schedule_ephemeral_delete(
@ -4045,7 +4052,7 @@ class BasePlatformAdapter(ABC):
chat_id=event.source.chat_id,
content=_text,
reply_to=_reply_anchor_for_event(event),
metadata=_thread_meta,
metadata=_mark_notify_metadata(_thread_meta),
)
if _eph_ttl > 0 and _r.success and _r.message_id:
self._schedule_ephemeral_delete(
@ -4268,6 +4275,12 @@ class BasePlatformAdapter(ABC):
)
text_content = _recovered
# Final user-visible content (text, TTS, media, files) gets
# the existing notify=True marker. Clone once so typing/status
# metadata stays unmarked and progress bubbles remain
# thread-strict.
_final_thread_metadata = _mark_notify_metadata(_thread_metadata)
# Auto-TTS: if voice message, generate audio FIRST (before sending text)
# Gated via ``_should_auto_tts_for_chat``: fires when the chat has
# an explicit ``/voice on|tts`` opt-in OR when ``voice.auto_tts`` is
@ -4307,7 +4320,7 @@ class BasePlatformAdapter(ABC):
chat_id=event.source.chat_id,
audio_path=_tts_path,
caption=telegram_tts_caption,
metadata=_thread_metadata,
metadata=_final_thread_metadata,
)
_tts_caption_delivered = bool(
telegram_tts_caption and getattr(tts_result, "success", False)
@ -4322,23 +4335,11 @@ class BasePlatformAdapter(ABC):
if text_content and not _tts_caption_delivered:
logger.info("[%s] Sending response (%d chars) to %s", self.name, len(text_content), event.source.chat_id)
_reply_anchor = _reply_anchor_for_event(event)
# Mark final response messages for notification delivery.
# Platform adapters that support per-message notification
# control (e.g. Telegram's disable_notification) use this
# flag to override silent-mode and ensure the final
# response triggers a push notification.
# Clone to avoid mutating the metadata shared with the
# typing-indicator task (which must remain unmarked).
if _thread_metadata is not None:
_thread_metadata = dict(_thread_metadata)
_thread_metadata["notify"] = True
else:
_thread_metadata = {"notify": True}
result = await self._send_with_retry(
chat_id=event.source.chat_id,
content=text_content,
reply_to=_reply_anchor,
metadata=_thread_metadata,
metadata=_final_thread_metadata,
)
_record_delivery(result)
@ -4367,7 +4368,7 @@ class BasePlatformAdapter(ABC):
await self.send_multiple_images(
chat_id=event.source.chat_id,
images=images,
metadata=_thread_metadata,
metadata=_final_thread_metadata,
human_delay=human_delay,
)
except Exception as batch_err:
@ -4409,7 +4410,7 @@ class BasePlatformAdapter(ABC):
await self.send_multiple_images(
chat_id=event.source.chat_id,
images=_batch,
metadata=_thread_metadata,
metadata=_final_thread_metadata,
human_delay=human_delay,
)
except Exception as batch_err:
@ -4424,19 +4425,19 @@ class BasePlatformAdapter(ABC):
media_result = await self.send_voice(
chat_id=event.source.chat_id,
audio_path=media_path,
metadata=_thread_metadata,
metadata=_final_thread_metadata,
)
elif ext in _VIDEO_EXTS:
media_result = await self.send_video(
chat_id=event.source.chat_id,
video_path=media_path,
metadata=_thread_metadata,
metadata=_final_thread_metadata,
)
else:
media_result = await self.send_document(
chat_id=event.source.chat_id,
file_path=media_path,
metadata=_thread_metadata,
metadata=_final_thread_metadata,
)
if not media_result.success:
@ -4454,13 +4455,13 @@ class BasePlatformAdapter(ABC):
await self.send_video(
chat_id=event.source.chat_id,
video_path=file_path,
metadata=_thread_metadata,
metadata=_final_thread_metadata,
)
else:
await self.send_document(
chat_id=event.source.chat_id,
file_path=file_path,
metadata=_thread_metadata,
metadata=_final_thread_metadata,
)
except Exception as file_err:
logger.error("[%s] Error sending local file %s: %s", self.name, file_path, file_err)

View file

@ -413,6 +413,57 @@ def _resolve_progress_thread_id(platform: Any, source_thread_id: Any, event_mess
return None
def _has_platform_display_override(user_config: dict, platform_key: str, setting: str) -> bool:
"""Return True when display.platforms.<platform> explicitly sets setting."""
display = user_config.get("display") if isinstance(user_config, dict) else None
if not isinstance(display, dict):
return False
platforms = display.get("platforms")
if not isinstance(platforms, dict):
return False
platform_cfg = platforms.get(platform_key)
return isinstance(platform_cfg, dict) and setting in platform_cfg
def _resolve_gateway_display_bool(
user_config: dict,
platform_key: str,
setting: str,
*,
default: bool = False,
platform: Any = None,
require_platform_override_for: set[Any] | None = None,
) -> bool:
"""Resolve a boolean display setting with optional platform-only opt-in.
Some display features expose assistant scratch text rather than deliberate
user-facing output. For high-noise threaded chat surfaces such as
Mattermost, a global opt-in is too broad: they must be enabled with an
explicit display.platforms.<platform>.<setting> override.
"""
current_platform = _gateway_platform_value(platform or platform_key)
platform_only = {
_gateway_platform_value(candidate)
for candidate in (require_platform_override_for or set())
}
if (
current_platform in platform_only
and not _has_platform_display_override(user_config, platform_key, setting)
):
return False
from gateway.display_config import resolve_display_setting
value = resolve_display_setting(user_config, platform_key, setting, default)
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.strip().lower() in {"true", "yes", "1", "on"}
if value is None:
return bool(default)
return bool(value)
def _telegramize_command_mentions(text: str, platform: Any) -> str:
"""Rewrite slash-command mentions to Telegram-valid command names.
@ -8989,17 +9040,24 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
source, session_entry, reason="agent-result-compression",
)
# Prepend reasoning/thinking if display is enabled (per-platform)
# Prepend reasoning/thinking if display is enabled (per-platform).
# Mattermost requires explicit per-platform opt-in because this is
# scratch text, not ordinary final-answer content.
try:
from gateway.display_config import resolve_display_setting as _rds
_show_reasoning_effective = _rds(
_show_reasoning_effective = _resolve_gateway_display_bool(
_load_gateway_config(),
_platform_config_key(source.platform),
"show_reasoning",
getattr(self, "_show_reasoning", False),
default=bool(getattr(self, "_show_reasoning", False)),
platform=source.platform,
require_platform_override_for={Platform.MATTERMOST},
)
except Exception:
_show_reasoning_effective = getattr(self, "_show_reasoning", False)
_show_reasoning_effective = (
False
if source.platform == Platform.MATTERMOST
else getattr(self, "_show_reasoning", False)
)
if _show_reasoning_effective and response and not _intentional_silence:
last_reasoning = agent_result.get("last_reasoning")
if last_reasoning:
@ -13635,18 +13693,32 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
# in chat platforms while opting into concise mid-turn updates.
interim_assistant_messages_enabled = (
source.platform != Platform.WEBHOOK
and bool(
resolve_display_setting(
user_config,
platform_key,
"interim_assistant_messages",
True,
)
and _resolve_gateway_display_bool(
user_config,
platform_key,
"interim_assistant_messages",
default=True,
platform=source.platform,
require_platform_override_for={Platform.MATTERMOST},
)
)
# thinking_progress is independent — if enabled, we need the progress
# queue even when tool_progress is off (thinking relay uses same infra).
# Mattermost requires a per-platform opt-in: global scratch-text display
# is too easy to leak into busy public threads.
_thinking_enabled = _resolve_gateway_display_bool(
user_config,
platform_key,
"thinking_progress",
default=False,
platform=source.platform,
require_platform_override_for={Platform.MATTERMOST},
)
needs_progress_queue = tool_progress_enabled or _thinking_enabled
# Queue for progress messages (thread-safe)
progress_queue = queue.Queue() if tool_progress_enabled else None
progress_queue = queue.Queue() if needs_progress_queue else None
last_tool = [None] # Mutable container for tracking in closure
last_progress_msg = [None] # Track last message for dedup
repeat_count = [0] # How many times the same message repeated
@ -13752,6 +13824,24 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
logger.debug("tool-progress onboarding hint failed: %s", _hint_err)
return
# "_thinking" is assistant scratch text between tool calls. It
# is never ordinary tool progress: only relay it when the platform
# explicitly opted into thinking_progress. Handle both legacy
# callback shapes: ("_thinking", text) and
# ("reasoning.available", "_thinking", text, ...).
if event_type == "_thinking" or tool_name == "_thinking":
if not _thinking_enabled:
return
thinking_text = preview if tool_name == "_thinking" else tool_name
msg = f"💬 {thinking_text}" if thinking_text else None
if msg:
progress_queue.put(msg)
return
# If tool_progress is off, only _thinking passes through (above).
# Regular tool calls are suppressed.
if not tool_progress_enabled:
return
# Only act on tool.started events (ignore tool.completed, reasoning.available, etc.)
if event_type not in {"tool.started",}:
@ -14783,6 +14873,10 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
agent.clarify_callback = _clarify_callback_sync
# Show assistant thinking between tool calls — independent of
# tool_progress mode. Mattermost needs an explicit per-platform
# opt-in so global scratch-text display does not leak into threads.
agent.thinking_progress = _thinking_enabled
# Store agent reference for interrupt support
agent_holder[0] = agent
# Capture the full tool definitions for transcript logging

View file

@ -197,6 +197,30 @@ class GatewayStreamConsumer:
# this response and route through edit-based for graceful degradation.
self._draft_failures = 0
def _metadata_for_send(
self,
*,
final: bool = False,
expect_edits: bool = False,
) -> dict | None:
"""Return per-send metadata for stream-created messages.
Mattermost treats notify-worthy sends as user-visible final content
when deciding whether a broken thread root may fall back flat. Preview
and progress sends keep their original metadata and remain thread-strict.
``expect_edits`` preserves the upstream Telegram streaming contract:
preview messages that may be edited later must stay on the editable
legacy send path, while fresh/fallback final sends can still use richer
final-message delivery.
"""
meta = dict(self.metadata) if self.metadata else {}
if expect_edits:
meta["expect_edits"] = True
if final:
meta["notify"] = True
return meta or None
@property
def already_sent(self) -> bool:
"""True if at least one message was sent or edited during the run."""
@ -513,7 +537,11 @@ class GatewayStreamConsumer:
chunks_delivered = False
reply_to = self._message_id or self._initial_reply_to_id
for chunk in chunks:
new_id = await self._send_new_chunk(chunk, reply_to)
new_id = await self._send_new_chunk(
chunk,
reply_to,
final=got_done,
)
if new_id is not None and new_id != reply_to:
chunks_delivered = True
self._accumulated = ""
@ -749,7 +777,13 @@ class GatewayStreamConsumer:
# Strip trailing whitespace/newlines but preserve leading content
return cleaned.rstrip()
async def _send_new_chunk(self, text: str, reply_to_id: Optional[str]) -> Optional[str]:
async def _send_new_chunk(
self,
text: str,
reply_to_id: Optional[str],
*,
final: bool = False,
) -> Optional[str]:
"""Send a new message chunk, optionally threaded to a previous message.
Returns the message_id so callers can thread subsequent chunks.
@ -758,15 +792,11 @@ class GatewayStreamConsumer:
if not text.strip():
return reply_to_id
try:
meta = dict(self.metadata) if self.metadata else {}
# This chunk becomes the next edit target — adapters that support
# rich final sends (Telegram) must keep it on the editable path.
meta["expect_edits"] = True
result = await self.adapter.send(
chat_id=self.chat_id,
content=text,
reply_to=reply_to_id,
metadata=meta,
metadata=self._metadata_for_send(final=final, expect_edits=True),
)
if result.success and result.message_id:
self._message_id = str(result.message_id)
@ -885,7 +915,7 @@ class GatewayStreamConsumer:
result = await self.adapter.send(
chat_id=self.chat_id,
content=chunk,
metadata=self.metadata,
metadata=self._metadata_for_send(final=True),
)
if result.success:
break
@ -1242,7 +1272,7 @@ class GatewayStreamConsumer:
result = await self.adapter.send(
chat_id=self.chat_id,
content=text,
metadata=self.metadata,
metadata=self._metadata_for_send(final=True),
)
except Exception as e:
logger.debug("Fresh-final send failed, falling back to edit: %s", e)
@ -1532,7 +1562,10 @@ class GatewayStreamConsumer:
chat_id=self.chat_id,
content=text,
reply_to=self._initial_reply_to_id,
metadata={**(self.metadata or {}), "expect_edits": True},
metadata=self._metadata_for_send(
final=finalize,
expect_edits=True,
),
)
if result.success:
if result.message_id:

View file

@ -6,7 +6,10 @@ import pytest
from unittest.mock import MagicMock, patch, AsyncMock
from gateway.config import Platform, PlatformConfig
from gateway.run import _resolve_progress_thread_id
from gateway.run import (
_resolve_gateway_display_bool,
_resolve_progress_thread_id,
)
class TestMattermostProgressThreadRouting:
@ -32,6 +35,97 @@ class TestMattermostProgressThreadRouting:
) is None
class TestMattermostDisplayHygiene:
def test_mattermost_requires_platform_opt_in_for_interim_assistant_messages(self):
"""Global interim commentary must not make Mattermost leak scratch notes."""
user_config = {"display": {"interim_assistant_messages": True}}
assert _resolve_gateway_display_bool(
user_config,
"mattermost",
"interim_assistant_messages",
default=True,
platform=Platform.MATTERMOST,
require_platform_override_for={Platform.MATTERMOST},
) is False
def test_mattermost_platform_opt_in_can_enable_interim_assistant_messages(self):
"""Mattermost can still opt into commentary explicitly per platform."""
user_config = {
"display": {
"interim_assistant_messages": False,
"platforms": {
"mattermost": {"interim_assistant_messages": True},
},
}
}
assert _resolve_gateway_display_bool(
user_config,
"mattermost",
"interim_assistant_messages",
default=True,
platform=Platform.MATTERMOST,
require_platform_override_for={Platform.MATTERMOST},
) is True
def test_mattermost_requires_platform_opt_in_for_thinking_progress(self):
"""Global thinking_progress must not surface internal analysis in Mattermost."""
user_config = {"display": {"thinking_progress": True}}
assert _resolve_gateway_display_bool(
user_config,
"mattermost",
"thinking_progress",
default=False,
platform=Platform.MATTERMOST,
require_platform_override_for={Platform.MATTERMOST},
) is False
def test_mattermost_requires_platform_opt_in_for_show_reasoning(self):
"""Global show_reasoning must not prepend scratch reasoning in Mattermost."""
user_config = {"display": {"show_reasoning": True}}
assert _resolve_gateway_display_bool(
user_config,
"mattermost",
"show_reasoning",
default=False,
platform=Platform.MATTERMOST,
require_platform_override_for={Platform.MATTERMOST},
) is False
def test_mattermost_platform_opt_in_can_enable_show_reasoning(self):
user_config = {
"display": {
"show_reasoning": False,
"platforms": {"mattermost": {"show_reasoning": True}},
}
}
assert _resolve_gateway_display_bool(
user_config,
"mattermost",
"show_reasoning",
default=False,
platform=Platform.MATTERMOST,
require_platform_override_for={Platform.MATTERMOST},
) is True
def test_global_thinking_progress_still_applies_to_other_platforms(self):
"""The Mattermost guard must not silently neuter Telegram/other chats."""
user_config = {"display": {"thinking_progress": True}}
assert _resolve_gateway_display_bool(
user_config,
"telegram",
"thinking_progress",
default=False,
platform=Platform.TELEGRAM,
require_platform_override_for={Platform.MATTERMOST},
) is True
# ---------------------------------------------------------------------------
# Platform & Config
# ---------------------------------------------------------------------------
@ -347,6 +441,24 @@ class TestMattermostSend:
payload = self.adapter._api_post.call_args_list[0][0][1]
assert payload["root_id"] == "root_post"
@pytest.mark.asyncio
async def test_progress_send_with_invalid_thread_root_never_falls_back_flat(self):
"""Tool/status/progress bubbles must stay quiet when the thread is broken."""
self.adapter._reply_mode = "thread"
self.adapter._api_get = AsyncMock(return_value={"id": "bad_root", "root_id": ""})
self.adapter._api_post = AsyncMock(return_value={})
result = await self.adapter.send(
"channel_1",
"⚙️ terminal...",
metadata={"thread_id": "bad_root"},
)
assert result.success is False
assert self.adapter._api_post.call_count == 1
payload = self.adapter._api_post.call_args_list[0][0][1]
assert payload["root_id"] == "bad_root"
@pytest.mark.asyncio
async def test_send_api_failure(self):
"""When API returns error, send should return failure."""

View file

@ -106,6 +106,42 @@ class TestInitialReplyToId:
assert call_kwargs["metadata"] == {**metadata, "expect_edits": True}
assert metadata == {"thread_id": "omt_topic789"}
@pytest.mark.asyncio
async def test_final_first_send_marks_metadata_notify_true(self):
"""Final streaming sends should use the existing notify=True marker."""
adapter = _make_adapter()
consumer = GatewayStreamConsumer(
adapter,
"chat_123",
metadata={"thread_id": "root_post_123"},
initial_reply_to_id="reply_post_456",
)
await consumer._send_or_edit("Final answer", finalize=True)
call_kwargs = adapter.send.call_args[1]
metadata = call_kwargs["metadata"]
assert metadata["thread_id"] == "root_post_123"
assert metadata["notify"] is True
assert "delivery_kind" not in metadata
assert "allow_flat_fallback" not in metadata
@pytest.mark.asyncio
async def test_nonfinal_first_send_does_not_mark_notify(self):
"""Preview/interim streaming sends must not be notify-worthy."""
adapter = _make_adapter()
consumer = GatewayStreamConsumer(
adapter,
"chat_123",
metadata={"thread_id": "root_post_123"},
initial_reply_to_id="reply_post_456",
)
await consumer._send_or_edit("Preview", finalize=False)
metadata = adapter.send.call_args[1]["metadata"]
assert metadata == {"thread_id": "root_post_123", "expect_edits": True}
class TestOverflowFirstMessage:
"""Verify thread routing is preserved when the first message overflows."""

View file

@ -134,7 +134,7 @@ async def test_stream_consumer_fallback_sends_tail_after_partial_overflow():
adapter.send.assert_awaited_once()
assert adapter.send.await_args.kwargs["content"] == "world"
assert adapter.send.await_args.kwargs["metadata"] == {"thread_id": "77"}
assert adapter.send.await_args.kwargs["metadata"] == {"thread_id": "77", "notify": True}
adapter.delete_message.assert_not_awaited()
assert consumer.final_response_sent is True
assert consumer.final_content_delivered is True

View file

@ -76,7 +76,7 @@ async def test_base_adapter_routes_telegram_flac_media_tag_to_document_sender(tm
adapter.send_document.assert_awaited_once_with(
chat_id="chat-1",
file_path=str(media_file),
metadata=None,
metadata={"notify": True},
)
adapter.send_voice.assert_not_awaited()
@ -95,7 +95,7 @@ async def test_base_adapter_routes_non_voice_telegram_ogg_media_tag_to_document_
adapter.send_document.assert_awaited_once_with(
chat_id="chat-1",
file_path=str(media_file),
metadata=None,
metadata={"notify": True},
)
adapter.send_voice.assert_not_awaited()
@ -116,7 +116,7 @@ async def test_base_adapter_routes_voice_tagged_telegram_ogg_media_tag_to_voice_
adapter.send_voice.assert_awaited_once_with(
chat_id="chat-1",
audio_path=str(media_file),
metadata=None,
metadata={"notify": True},
)
adapter.send_document.assert_not_awaited()