From 16fc7170911470f8bc01c4e87737dc309bef0deb Mon Sep 17 00:00:00 2001 From: Wolfram Ravenwolf Date: Mon, 1 Jun 2026 04:09:59 +0200 Subject: [PATCH] fix(mattermost): harden delivery hygiene PROBLEM: Mattermost threads can become invalid or enormous, exposing two failure modes: internal scratch/reasoning/commentary displays could leak into persistent Mattermost threads via global display toggles, while rejected threaded user-visible replies could disappear unless every failed send fell back flat. A broad flat fallback would pollute channels with tool/status/progress noise. SOLUTION: Require explicit Mattermost platform opt-in for scratch displays, keep using the existing notify=True metadata marker for user-visible final text/media/file replies, and allow the Mattermost plugin adapter to flat-fallback only notify-worthy sends whose threaded POST failure looks like a broken root/thread. Keep tool/status/progress and other non-notify sends thread-strict. Add regression tests for display opt-in, notify-only broken-thread fallback, generic API failure suppression, and stream notify metadata. Verification: tests/gateway/test_mattermost.py tests/gateway/test_stream_consumer.py tests/gateway/test_stream_consumer_thread_routing.py tests/gateway/test_stream_consumer_fresh_final.py tests/gateway/test_stream_consumer_draft.py; tests/gateway/test_session_api.py tests/gateway/test_status_command.py tests/gateway/test_resume_command.py tests/hermes_cli/test_commands.py; py_compile touched gateway files; git diff --check. Session: Mattermost thread 6qg8e9dd1pd9pkhi74xyaa1mry, 2026-06-01. --- gateway/platforms/base.py | 49 +++---- gateway/run.py | 122 ++++++++++++++++-- gateway/stream_consumer.py | 53 ++++++-- tests/gateway/test_mattermost.py | 114 +++++++++++++++- .../test_stream_consumer_thread_routing.py | 36 ++++++ .../gateway/test_telegram_overflow_partial.py | 2 +- tests/gateway/test_tts_media_routing.py | 6 +- 7 files changed, 329 insertions(+), 53 deletions(-) diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index 205d9cbf509..cda3acc6e58 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -77,6 +77,13 @@ def _thread_metadata_for_source(source, reply_to_message_id: str | None = None) return metadata +def _mark_notify_metadata(metadata: dict | None) -> dict: + """Clone metadata and mark a user-visible reply as notify-worthy.""" + notify_metadata = dict(metadata) if metadata else {} + notify_metadata["notify"] = True + return notify_metadata + + def _reply_anchor_for_event(event) -> str | None: """Return reply_to id for platforms that need reply semantics. @@ -3889,7 +3896,7 @@ class BasePlatformAdapter(ABC): chat_id=event.source.chat_id, content=_text, reply_to=_reply_anchor_for_event(event), - metadata=thread_meta, + metadata=_mark_notify_metadata(thread_meta), ) if _eph_ttl > 0 and _r.success and _r.message_id: self._schedule_ephemeral_delete( @@ -3995,7 +4002,7 @@ class BasePlatformAdapter(ABC): chat_id=event.source.chat_id, content=_text, reply_to=_reply_anchor_for_event(event), - metadata=_thread_meta, + metadata=_mark_notify_metadata(_thread_meta), ) if _eph_ttl > 0 and _r.success and _r.message_id: self._schedule_ephemeral_delete( @@ -4045,7 +4052,7 @@ class BasePlatformAdapter(ABC): chat_id=event.source.chat_id, content=_text, reply_to=_reply_anchor_for_event(event), - metadata=_thread_meta, + metadata=_mark_notify_metadata(_thread_meta), ) if _eph_ttl > 0 and _r.success and _r.message_id: self._schedule_ephemeral_delete( @@ -4268,6 +4275,12 @@ class BasePlatformAdapter(ABC): ) text_content = _recovered + # Final user-visible content (text, TTS, media, files) gets + # the existing notify=True marker. Clone once so typing/status + # metadata stays unmarked and progress bubbles remain + # thread-strict. + _final_thread_metadata = _mark_notify_metadata(_thread_metadata) + # Auto-TTS: if voice message, generate audio FIRST (before sending text) # Gated via ``_should_auto_tts_for_chat``: fires when the chat has # an explicit ``/voice on|tts`` opt-in OR when ``voice.auto_tts`` is @@ -4307,7 +4320,7 @@ class BasePlatformAdapter(ABC): chat_id=event.source.chat_id, audio_path=_tts_path, caption=telegram_tts_caption, - metadata=_thread_metadata, + metadata=_final_thread_metadata, ) _tts_caption_delivered = bool( telegram_tts_caption and getattr(tts_result, "success", False) @@ -4322,23 +4335,11 @@ class BasePlatformAdapter(ABC): if text_content and not _tts_caption_delivered: logger.info("[%s] Sending response (%d chars) to %s", self.name, len(text_content), event.source.chat_id) _reply_anchor = _reply_anchor_for_event(event) - # Mark final response messages for notification delivery. - # Platform adapters that support per-message notification - # control (e.g. Telegram's disable_notification) use this - # flag to override silent-mode and ensure the final - # response triggers a push notification. - # Clone to avoid mutating the metadata shared with the - # typing-indicator task (which must remain unmarked). - if _thread_metadata is not None: - _thread_metadata = dict(_thread_metadata) - _thread_metadata["notify"] = True - else: - _thread_metadata = {"notify": True} result = await self._send_with_retry( chat_id=event.source.chat_id, content=text_content, reply_to=_reply_anchor, - metadata=_thread_metadata, + metadata=_final_thread_metadata, ) _record_delivery(result) @@ -4367,7 +4368,7 @@ class BasePlatformAdapter(ABC): await self.send_multiple_images( chat_id=event.source.chat_id, images=images, - metadata=_thread_metadata, + metadata=_final_thread_metadata, human_delay=human_delay, ) except Exception as batch_err: @@ -4409,7 +4410,7 @@ class BasePlatformAdapter(ABC): await self.send_multiple_images( chat_id=event.source.chat_id, images=_batch, - metadata=_thread_metadata, + metadata=_final_thread_metadata, human_delay=human_delay, ) except Exception as batch_err: @@ -4424,19 +4425,19 @@ class BasePlatformAdapter(ABC): media_result = await self.send_voice( chat_id=event.source.chat_id, audio_path=media_path, - metadata=_thread_metadata, + metadata=_final_thread_metadata, ) elif ext in _VIDEO_EXTS: media_result = await self.send_video( chat_id=event.source.chat_id, video_path=media_path, - metadata=_thread_metadata, + metadata=_final_thread_metadata, ) else: media_result = await self.send_document( chat_id=event.source.chat_id, file_path=media_path, - metadata=_thread_metadata, + metadata=_final_thread_metadata, ) if not media_result.success: @@ -4454,13 +4455,13 @@ class BasePlatformAdapter(ABC): await self.send_video( chat_id=event.source.chat_id, video_path=file_path, - metadata=_thread_metadata, + metadata=_final_thread_metadata, ) else: await self.send_document( chat_id=event.source.chat_id, file_path=file_path, - metadata=_thread_metadata, + metadata=_final_thread_metadata, ) except Exception as file_err: logger.error("[%s] Error sending local file %s: %s", self.name, file_path, file_err) diff --git a/gateway/run.py b/gateway/run.py index 470d71906c3..b688f3a3613 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -413,6 +413,57 @@ def _resolve_progress_thread_id(platform: Any, source_thread_id: Any, event_mess return None +def _has_platform_display_override(user_config: dict, platform_key: str, setting: str) -> bool: + """Return True when display.platforms. explicitly sets setting.""" + display = user_config.get("display") if isinstance(user_config, dict) else None + if not isinstance(display, dict): + return False + platforms = display.get("platforms") + if not isinstance(platforms, dict): + return False + platform_cfg = platforms.get(platform_key) + return isinstance(platform_cfg, dict) and setting in platform_cfg + + +def _resolve_gateway_display_bool( + user_config: dict, + platform_key: str, + setting: str, + *, + default: bool = False, + platform: Any = None, + require_platform_override_for: set[Any] | None = None, +) -> bool: + """Resolve a boolean display setting with optional platform-only opt-in. + + Some display features expose assistant scratch text rather than deliberate + user-facing output. For high-noise threaded chat surfaces such as + Mattermost, a global opt-in is too broad: they must be enabled with an + explicit display.platforms.. override. + """ + current_platform = _gateway_platform_value(platform or platform_key) + platform_only = { + _gateway_platform_value(candidate) + for candidate in (require_platform_override_for or set()) + } + if ( + current_platform in platform_only + and not _has_platform_display_override(user_config, platform_key, setting) + ): + return False + + from gateway.display_config import resolve_display_setting + + value = resolve_display_setting(user_config, platform_key, setting, default) + if isinstance(value, bool): + return value + if isinstance(value, str): + return value.strip().lower() in {"true", "yes", "1", "on"} + if value is None: + return bool(default) + return bool(value) + + def _telegramize_command_mentions(text: str, platform: Any) -> str: """Rewrite slash-command mentions to Telegram-valid command names. @@ -8989,17 +9040,24 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew source, session_entry, reason="agent-result-compression", ) - # Prepend reasoning/thinking if display is enabled (per-platform) + # Prepend reasoning/thinking if display is enabled (per-platform). + # Mattermost requires explicit per-platform opt-in because this is + # scratch text, not ordinary final-answer content. try: - from gateway.display_config import resolve_display_setting as _rds - _show_reasoning_effective = _rds( + _show_reasoning_effective = _resolve_gateway_display_bool( _load_gateway_config(), _platform_config_key(source.platform), "show_reasoning", - getattr(self, "_show_reasoning", False), + default=bool(getattr(self, "_show_reasoning", False)), + platform=source.platform, + require_platform_override_for={Platform.MATTERMOST}, ) except Exception: - _show_reasoning_effective = getattr(self, "_show_reasoning", False) + _show_reasoning_effective = ( + False + if source.platform == Platform.MATTERMOST + else getattr(self, "_show_reasoning", False) + ) if _show_reasoning_effective and response and not _intentional_silence: last_reasoning = agent_result.get("last_reasoning") if last_reasoning: @@ -13635,18 +13693,32 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew # in chat platforms while opting into concise mid-turn updates. interim_assistant_messages_enabled = ( source.platform != Platform.WEBHOOK - and bool( - resolve_display_setting( - user_config, - platform_key, - "interim_assistant_messages", - True, - ) + and _resolve_gateway_display_bool( + user_config, + platform_key, + "interim_assistant_messages", + default=True, + platform=source.platform, + require_platform_override_for={Platform.MATTERMOST}, ) ) - + # thinking_progress is independent — if enabled, we need the progress + # queue even when tool_progress is off (thinking relay uses same infra). + # Mattermost requires a per-platform opt-in: global scratch-text display + # is too easy to leak into busy public threads. + _thinking_enabled = _resolve_gateway_display_bool( + user_config, + platform_key, + "thinking_progress", + default=False, + platform=source.platform, + require_platform_override_for={Platform.MATTERMOST}, + ) + needs_progress_queue = tool_progress_enabled or _thinking_enabled + + # Queue for progress messages (thread-safe) - progress_queue = queue.Queue() if tool_progress_enabled else None + progress_queue = queue.Queue() if needs_progress_queue else None last_tool = [None] # Mutable container for tracking in closure last_progress_msg = [None] # Track last message for dedup repeat_count = [0] # How many times the same message repeated @@ -13752,6 +13824,24 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew logger.debug("tool-progress onboarding hint failed: %s", _hint_err) return + # "_thinking" is assistant scratch text between tool calls. It + # is never ordinary tool progress: only relay it when the platform + # explicitly opted into thinking_progress. Handle both legacy + # callback shapes: ("_thinking", text) and + # ("reasoning.available", "_thinking", text, ...). + if event_type == "_thinking" or tool_name == "_thinking": + if not _thinking_enabled: + return + thinking_text = preview if tool_name == "_thinking" else tool_name + msg = f"💬 {thinking_text}" if thinking_text else None + if msg: + progress_queue.put(msg) + return + + # If tool_progress is off, only _thinking passes through (above). + # Regular tool calls are suppressed. + if not tool_progress_enabled: + return # Only act on tool.started events (ignore tool.completed, reasoning.available, etc.) if event_type not in {"tool.started",}: @@ -14783,6 +14873,10 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew agent.clarify_callback = _clarify_callback_sync + # Show assistant thinking between tool calls — independent of + # tool_progress mode. Mattermost needs an explicit per-platform + # opt-in so global scratch-text display does not leak into threads. + agent.thinking_progress = _thinking_enabled # Store agent reference for interrupt support agent_holder[0] = agent # Capture the full tool definitions for transcript logging diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index 7fc9846c49f..f559d7ecd43 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -197,6 +197,30 @@ class GatewayStreamConsumer: # this response and route through edit-based for graceful degradation. self._draft_failures = 0 + def _metadata_for_send( + self, + *, + final: bool = False, + expect_edits: bool = False, + ) -> dict | None: + """Return per-send metadata for stream-created messages. + + Mattermost treats notify-worthy sends as user-visible final content + when deciding whether a broken thread root may fall back flat. Preview + and progress sends keep their original metadata and remain thread-strict. + + ``expect_edits`` preserves the upstream Telegram streaming contract: + preview messages that may be edited later must stay on the editable + legacy send path, while fresh/fallback final sends can still use richer + final-message delivery. + """ + meta = dict(self.metadata) if self.metadata else {} + if expect_edits: + meta["expect_edits"] = True + if final: + meta["notify"] = True + return meta or None + @property def already_sent(self) -> bool: """True if at least one message was sent or edited during the run.""" @@ -513,7 +537,11 @@ class GatewayStreamConsumer: chunks_delivered = False reply_to = self._message_id or self._initial_reply_to_id for chunk in chunks: - new_id = await self._send_new_chunk(chunk, reply_to) + new_id = await self._send_new_chunk( + chunk, + reply_to, + final=got_done, + ) if new_id is not None and new_id != reply_to: chunks_delivered = True self._accumulated = "" @@ -749,7 +777,13 @@ class GatewayStreamConsumer: # Strip trailing whitespace/newlines but preserve leading content return cleaned.rstrip() - async def _send_new_chunk(self, text: str, reply_to_id: Optional[str]) -> Optional[str]: + async def _send_new_chunk( + self, + text: str, + reply_to_id: Optional[str], + *, + final: bool = False, + ) -> Optional[str]: """Send a new message chunk, optionally threaded to a previous message. Returns the message_id so callers can thread subsequent chunks. @@ -758,15 +792,11 @@ class GatewayStreamConsumer: if not text.strip(): return reply_to_id try: - meta = dict(self.metadata) if self.metadata else {} - # This chunk becomes the next edit target — adapters that support - # rich final sends (Telegram) must keep it on the editable path. - meta["expect_edits"] = True result = await self.adapter.send( chat_id=self.chat_id, content=text, reply_to=reply_to_id, - metadata=meta, + metadata=self._metadata_for_send(final=final, expect_edits=True), ) if result.success and result.message_id: self._message_id = str(result.message_id) @@ -885,7 +915,7 @@ class GatewayStreamConsumer: result = await self.adapter.send( chat_id=self.chat_id, content=chunk, - metadata=self.metadata, + metadata=self._metadata_for_send(final=True), ) if result.success: break @@ -1242,7 +1272,7 @@ class GatewayStreamConsumer: result = await self.adapter.send( chat_id=self.chat_id, content=text, - metadata=self.metadata, + metadata=self._metadata_for_send(final=True), ) except Exception as e: logger.debug("Fresh-final send failed, falling back to edit: %s", e) @@ -1532,7 +1562,10 @@ class GatewayStreamConsumer: chat_id=self.chat_id, content=text, reply_to=self._initial_reply_to_id, - metadata={**(self.metadata or {}), "expect_edits": True}, + metadata=self._metadata_for_send( + final=finalize, + expect_edits=True, + ), ) if result.success: if result.message_id: diff --git a/tests/gateway/test_mattermost.py b/tests/gateway/test_mattermost.py index 9b174a5137a..1fedb30a019 100644 --- a/tests/gateway/test_mattermost.py +++ b/tests/gateway/test_mattermost.py @@ -6,7 +6,10 @@ import pytest from unittest.mock import MagicMock, patch, AsyncMock from gateway.config import Platform, PlatformConfig -from gateway.run import _resolve_progress_thread_id +from gateway.run import ( + _resolve_gateway_display_bool, + _resolve_progress_thread_id, +) class TestMattermostProgressThreadRouting: @@ -32,6 +35,97 @@ class TestMattermostProgressThreadRouting: ) is None +class TestMattermostDisplayHygiene: + def test_mattermost_requires_platform_opt_in_for_interim_assistant_messages(self): + """Global interim commentary must not make Mattermost leak scratch notes.""" + user_config = {"display": {"interim_assistant_messages": True}} + + assert _resolve_gateway_display_bool( + user_config, + "mattermost", + "interim_assistant_messages", + default=True, + platform=Platform.MATTERMOST, + require_platform_override_for={Platform.MATTERMOST}, + ) is False + + def test_mattermost_platform_opt_in_can_enable_interim_assistant_messages(self): + """Mattermost can still opt into commentary explicitly per platform.""" + user_config = { + "display": { + "interim_assistant_messages": False, + "platforms": { + "mattermost": {"interim_assistant_messages": True}, + }, + } + } + + assert _resolve_gateway_display_bool( + user_config, + "mattermost", + "interim_assistant_messages", + default=True, + platform=Platform.MATTERMOST, + require_platform_override_for={Platform.MATTERMOST}, + ) is True + + def test_mattermost_requires_platform_opt_in_for_thinking_progress(self): + """Global thinking_progress must not surface internal analysis in Mattermost.""" + user_config = {"display": {"thinking_progress": True}} + + assert _resolve_gateway_display_bool( + user_config, + "mattermost", + "thinking_progress", + default=False, + platform=Platform.MATTERMOST, + require_platform_override_for={Platform.MATTERMOST}, + ) is False + + def test_mattermost_requires_platform_opt_in_for_show_reasoning(self): + """Global show_reasoning must not prepend scratch reasoning in Mattermost.""" + user_config = {"display": {"show_reasoning": True}} + + assert _resolve_gateway_display_bool( + user_config, + "mattermost", + "show_reasoning", + default=False, + platform=Platform.MATTERMOST, + require_platform_override_for={Platform.MATTERMOST}, + ) is False + + def test_mattermost_platform_opt_in_can_enable_show_reasoning(self): + user_config = { + "display": { + "show_reasoning": False, + "platforms": {"mattermost": {"show_reasoning": True}}, + } + } + + assert _resolve_gateway_display_bool( + user_config, + "mattermost", + "show_reasoning", + default=False, + platform=Platform.MATTERMOST, + require_platform_override_for={Platform.MATTERMOST}, + ) is True + + def test_global_thinking_progress_still_applies_to_other_platforms(self): + """The Mattermost guard must not silently neuter Telegram/other chats.""" + user_config = {"display": {"thinking_progress": True}} + + assert _resolve_gateway_display_bool( + user_config, + "telegram", + "thinking_progress", + default=False, + platform=Platform.TELEGRAM, + require_platform_override_for={Platform.MATTERMOST}, + ) is True + + # --------------------------------------------------------------------------- # Platform & Config # --------------------------------------------------------------------------- @@ -347,6 +441,24 @@ class TestMattermostSend: payload = self.adapter._api_post.call_args_list[0][0][1] assert payload["root_id"] == "root_post" + @pytest.mark.asyncio + async def test_progress_send_with_invalid_thread_root_never_falls_back_flat(self): + """Tool/status/progress bubbles must stay quiet when the thread is broken.""" + self.adapter._reply_mode = "thread" + self.adapter._api_get = AsyncMock(return_value={"id": "bad_root", "root_id": ""}) + self.adapter._api_post = AsyncMock(return_value={}) + + result = await self.adapter.send( + "channel_1", + "⚙️ terminal...", + metadata={"thread_id": "bad_root"}, + ) + + assert result.success is False + assert self.adapter._api_post.call_count == 1 + payload = self.adapter._api_post.call_args_list[0][0][1] + assert payload["root_id"] == "bad_root" + @pytest.mark.asyncio async def test_send_api_failure(self): """When API returns error, send should return failure.""" diff --git a/tests/gateway/test_stream_consumer_thread_routing.py b/tests/gateway/test_stream_consumer_thread_routing.py index b2b7f22ffe5..3c84aef4fa8 100644 --- a/tests/gateway/test_stream_consumer_thread_routing.py +++ b/tests/gateway/test_stream_consumer_thread_routing.py @@ -106,6 +106,42 @@ class TestInitialReplyToId: assert call_kwargs["metadata"] == {**metadata, "expect_edits": True} assert metadata == {"thread_id": "omt_topic789"} + @pytest.mark.asyncio + async def test_final_first_send_marks_metadata_notify_true(self): + """Final streaming sends should use the existing notify=True marker.""" + adapter = _make_adapter() + consumer = GatewayStreamConsumer( + adapter, + "chat_123", + metadata={"thread_id": "root_post_123"}, + initial_reply_to_id="reply_post_456", + ) + + await consumer._send_or_edit("Final answer", finalize=True) + + call_kwargs = adapter.send.call_args[1] + metadata = call_kwargs["metadata"] + assert metadata["thread_id"] == "root_post_123" + assert metadata["notify"] is True + assert "delivery_kind" not in metadata + assert "allow_flat_fallback" not in metadata + + @pytest.mark.asyncio + async def test_nonfinal_first_send_does_not_mark_notify(self): + """Preview/interim streaming sends must not be notify-worthy.""" + adapter = _make_adapter() + consumer = GatewayStreamConsumer( + adapter, + "chat_123", + metadata={"thread_id": "root_post_123"}, + initial_reply_to_id="reply_post_456", + ) + + await consumer._send_or_edit("Preview", finalize=False) + + metadata = adapter.send.call_args[1]["metadata"] + assert metadata == {"thread_id": "root_post_123", "expect_edits": True} + class TestOverflowFirstMessage: """Verify thread routing is preserved when the first message overflows.""" diff --git a/tests/gateway/test_telegram_overflow_partial.py b/tests/gateway/test_telegram_overflow_partial.py index 76e4d16a617..38b10299dc3 100644 --- a/tests/gateway/test_telegram_overflow_partial.py +++ b/tests/gateway/test_telegram_overflow_partial.py @@ -134,7 +134,7 @@ async def test_stream_consumer_fallback_sends_tail_after_partial_overflow(): adapter.send.assert_awaited_once() assert adapter.send.await_args.kwargs["content"] == "world" - assert adapter.send.await_args.kwargs["metadata"] == {"thread_id": "77"} + assert adapter.send.await_args.kwargs["metadata"] == {"thread_id": "77", "notify": True} adapter.delete_message.assert_not_awaited() assert consumer.final_response_sent is True assert consumer.final_content_delivered is True diff --git a/tests/gateway/test_tts_media_routing.py b/tests/gateway/test_tts_media_routing.py index eaf9c592808..016be97ea27 100644 --- a/tests/gateway/test_tts_media_routing.py +++ b/tests/gateway/test_tts_media_routing.py @@ -76,7 +76,7 @@ async def test_base_adapter_routes_telegram_flac_media_tag_to_document_sender(tm adapter.send_document.assert_awaited_once_with( chat_id="chat-1", file_path=str(media_file), - metadata=None, + metadata={"notify": True}, ) adapter.send_voice.assert_not_awaited() @@ -95,7 +95,7 @@ async def test_base_adapter_routes_non_voice_telegram_ogg_media_tag_to_document_ adapter.send_document.assert_awaited_once_with( chat_id="chat-1", file_path=str(media_file), - metadata=None, + metadata={"notify": True}, ) adapter.send_voice.assert_not_awaited() @@ -116,7 +116,7 @@ async def test_base_adapter_routes_voice_tagged_telegram_ogg_media_tag_to_voice_ adapter.send_voice.assert_awaited_once_with( chat_id="chat-1", audio_path=str(media_file), - metadata=None, + metadata={"notify": True}, ) adapter.send_document.assert_not_awaited()