From caaa916289f2ab9b02049d819523632e05588784 Mon Sep 17 00:00:00 2001 From: snav Date: Wed, 10 Jun 2026 19:54:18 -0400 Subject: [PATCH] fix(gateway): don't let delayed Discord status messages partition history backfill Discord channel-history backfill partitions on Hermes' last self-authored message. Asynchronous, non-conversational status sends (self-improvement review bubbles, heartbeats, background-process notifications, update status, gateway restart/online notices) land as ordinary bot messages, so a delayed status bump becomes the history boundary and swallows real messages that arrived after Hermes' actual reply. Mark these sends at the source via metadata["non_conversational"] (Discord only; other platforms' metadata is unchanged). The adapter no longer advances the history-boundary cache for marked sends and persists their IDs to a sidecar JSON so the cold-start scan can skip them by ID after a restart. A narrow regex recognizer remains only as an upgrade bridge for status bumps emitted by an older gateway that pre-dates the marking. --- gateway/run.py | 70 ++++++++--- plugins/platforms/discord/adapter.py | 128 ++++++++++++++++++-- tests/gateway/test_discord_free_response.py | 93 +++++++++++++- 3 files changed, 268 insertions(+), 23 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index 741f2a235ad..e612d8a34d5 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -195,6 +195,19 @@ def _gateway_platform_value(platform: Any) -> str: return str(getattr(platform, "value", platform) or "").strip().lower() +def _non_conversational_metadata( + metadata: Optional[Dict[str, Any]] = None, + *, + platform: Any = None, +) -> Optional[Dict[str, Any]]: + """Mark Discord lifecycle/status sends without changing other platforms.""" + if _gateway_platform_value(platform) != "discord": + return metadata + merged = dict(metadata or {}) + merged["non_conversational"] = True + return merged + + def _is_transient_network_error(exc: BaseException) -> bool: """Return True for transient network errors safe to log + swallow. @@ -11746,7 +11759,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew chunks = [clean[i:i + max_chunk] for i in range(0, len(clean), max_chunk)] for chunk in chunks: try: - await adapter.send(chat_id, f"```\n{chunk}\n```", metadata=metadata) + await adapter.send( + chat_id, + f"```\n{chunk}\n```", + metadata=_non_conversational_metadata(metadata, platform=platform), + ) except Exception as e: logger.debug("Update stream send failed: %s", e) @@ -11769,12 +11786,16 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew exit_code_raw = exit_code_path.read_text().strip() or "1" exit_code = int(exit_code_raw) if exit_code == 0: - await adapter.send(chat_id, "✅ Hermes update finished.", metadata=metadata) + await adapter.send( + chat_id, + "✅ Hermes update finished.", + metadata=_non_conversational_metadata(metadata, platform=platform), + ) else: await adapter.send( chat_id, "❌ Hermes update failed (exit code {}).".format(exit_code), - metadata=metadata, + metadata=_non_conversational_metadata(metadata, platform=platform), ) logger.info("Update finished (exit=%s), notified %s", exit_code, session_key) except Exception as e: @@ -11825,7 +11846,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew prompt=prompt_text, default=default, session_key=session_key, - metadata=metadata, + metadata=_non_conversational_metadata(metadata, platform=platform), ) sent_buttons = True except Exception as btn_err: @@ -11839,7 +11860,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew f"{prompt_text}{default_hint}\n\n" f"Reply `{_p}approve` (yes) or `{_p}deny` (no), " f"or type your answer directly.", - metadata=metadata, + metadata=_non_conversational_metadata(metadata, platform=platform), ) # Keep the prompt marker on disk until the user # answers. If the gateway restarts mid-prompt, the @@ -11863,7 +11884,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew await adapter.send( chat_id, "❌ Hermes update timed out after 30 minutes.", - metadata=metadata, + metadata=_non_conversational_metadata(metadata, platform=platform), ) except Exception: pass @@ -11969,7 +11990,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew msg = "✅ Hermes update finished successfully." else: msg = "❌ Hermes update failed. Check the gateway logs or run `hermes update` manually for details." - await adapter.send(chat_id, msg, metadata=metadata) + await adapter.send( + chat_id, + msg, + metadata=_non_conversational_metadata(metadata, platform=platform), + ) logger.info( "Sent post-update notification to %s:%s (exit=%s)", platform_str, @@ -12032,7 +12057,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew result = await adapter.send( str(chat_id), "♻ Gateway restarted successfully. Your session continues.", - metadata=metadata, + metadata=_non_conversational_metadata(metadata, platform=platform), ) # adapter.send() catches provider errors (e.g. "Chat not found") # and returns SendResult(success=False) rather than raising, so @@ -12099,9 +12124,17 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew adapter=adapter, ) if metadata: - result = await adapter.send(str(home.chat_id), message, metadata=metadata) + result = await adapter.send( + str(home.chat_id), + message, + metadata=_non_conversational_metadata(metadata, platform=platform), + ) else: - result = await adapter.send(str(home.chat_id), message) + result = await adapter.send( + str(home.chat_id), + message, + metadata=_non_conversational_metadata(platform=platform), + ) if result is not None and getattr(result, "success", True) is False: logger.warning( "Home-channel startup notification failed for %s:%s: %s", @@ -12742,7 +12775,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew if adapter and chat_id: try: send_meta = {"thread_id": thread_id} if thread_id else None - await adapter.send(chat_id, message_text, metadata=send_meta) + await adapter.send( + chat_id, + message_text, + metadata=_non_conversational_metadata(send_meta, platform=platform_name), + ) except Exception as e: logger.error("Watcher delivery error: %s", e) break @@ -12763,7 +12800,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew if adapter and chat_id: try: send_meta = {"thread_id": thread_id} if thread_id else None - await adapter.send(chat_id, message_text, metadata=send_meta) + await adapter.send( + chat_id, + message_text, + metadata=_non_conversational_metadata(send_meta, platform=platform_name), + ) except Exception as e: logger.error("Watcher delivery error: %s", e) @@ -14144,6 +14185,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew if _progress_thread_id == source.thread_id else {"thread_id": _progress_thread_id} ) if _progress_thread_id else None + _progress_metadata = _non_conversational_metadata(_progress_metadata, platform=source.platform) _progress_reply_to = ( event_message_id if source.platform in (Platform.FEISHU, Platform.MATTERMOST) and source.thread_id and event_message_id @@ -14906,7 +14948,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew _status_adapter.send( _status_chat_id, message, - metadata=_status_thread_metadata, + metadata=_non_conversational_metadata(_status_thread_metadata, platform=source.platform), ), _loop_for_step, logger=logger, @@ -15748,7 +15790,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew _notify_res = await _notify_adapter.send( source.chat_id, _heartbeat_text, - metadata=_status_thread_metadata, + metadata=_non_conversational_metadata(_status_thread_metadata, platform=source.platform), ) if getattr(_notify_res, "success", False) and getattr( _notify_res, "message_id", None diff --git a/plugins/platforms/discord/adapter.py b/plugins/platforms/discord/adapter.py index 6ca199dcfaf..607123bbd29 100644 --- a/plugins/platforms/discord/adapter.py +++ b/plugins/platforms/discord/adapter.py @@ -14,6 +14,7 @@ import hashlib import json import logging import os +import re import struct import subprocess import tempfile @@ -29,6 +30,7 @@ VALID_THREAD_AUTO_ARCHIVE_MINUTES = {60, 1440, 4320, 10080} _DISCORD_COMMAND_SYNC_POLICIES = {"safe", "bulk", "off"} _DISCORD_COMMAND_SYNC_STATE_SUBDIR = "gateway" _DISCORD_COMMAND_SYNC_STATE_FILENAME = "discord_command_sync_state.json" +_DISCORD_NONCONVERSATIONAL_STATE_FILENAME = "discord_nonconversational_messages.json" _DISCORD_COMMAND_SYNC_MUTATION_INTERVAL_SECONDS = 4.5 _DISCORD_COMMAND_SYNC_MAX_RATE_LIMIT_SLEEP_SECONDS = 30.0 # Discord enforces a hard cap of 100 global application (slash) commands per @@ -37,6 +39,37 @@ _DISCORD_COMMAND_SYNC_MAX_RATE_LIMIT_SLEEP_SECONDS = 30.0 # every slash command — not just the overflow ones. We keep the desired set # at or below this limit at registration time. _DISCORD_MAX_APP_COMMANDS = 100 +_DISCORD_NONCONVERSATIONAL_METADATA_KEYS = frozenset({ + "non_conversational", + "non_conversational_history", +}) +# Upgrade-bridge fallback only. The primary mechanism is the persisted +# non-conversational message-ID set populated from explicitly marked sends +# (metadata["non_conversational"]). These regexes exist solely to recognize +# status bumps emitted by an older gateway version that pre-dates the marking, +# so they don't partition history after an upgrade. New emitters should set the +# metadata flag, not rely on a regex here. +_DISCORD_NONCONVERSATIONAL_HISTORY_MESSAGE_PATTERNS = ( + re.compile(r"^\s*💾\s*Self-improvement review:\s+\S[\s\S]*$", re.IGNORECASE), + # Legacy/background-review test doubles used this shorter form before the + # self-improvement prefix became the stable emitter contract. + re.compile( + r"^\s*💾\s+Skill\s+['\"].+?['\"]\s+(?:created|updated|improved|patched)\.?\s*$", + re.IGNORECASE, + ), + re.compile(r"^\s*⏳\s+Working\s+—\s+\d+\s+min(?:\s|$)", re.IGNORECASE), + re.compile( + r"^\s*\[Background process\s+\S+\s+" + r"(?:finished with exit code|is still running~)[\s\S]*\]\s*$", + re.IGNORECASE, + ), + re.compile( + r"^\s*(?:✅|❌)\s+Hermes update\s+" + r"(?:finished|failed|timed out)[\s\S]*$", + re.IGNORECASE, + ), + re.compile(r"^\s*♻️?\s+Gateway\s+(?:restarted successfully|online\b)[\s\S]*$", re.IGNORECASE), +) try: import discord @@ -55,7 +88,6 @@ from pathlib import Path as _Path sys.path.insert(0, str(_Path(__file__).resolve().parents[2])) from gateway.config import Platform, PlatformConfig -import re from gateway.platforms.helpers import MessageDeduplicator, ThreadParticipationTracker from utils import atomic_json_write @@ -132,6 +164,73 @@ def _find_discord_windows_bundled_opus(discord_module: Any = None) -> Optional[s return None +class _DiscordNonConversationalMessageTracker: + """Persistent bounded set of Discord message IDs that are status noise.""" + + _MAX_TRACKED = 2000 + + def __init__(self, max_tracked: int = _MAX_TRACKED): + self._max_tracked = max_tracked + self._ids: dict[str, None] = dict.fromkeys(self._load()) + + def _state_path(self) -> _Path: + from hermes_constants import get_hermes_home + + return ( + get_hermes_home() + / _DISCORD_COMMAND_SYNC_STATE_SUBDIR + / _DISCORD_NONCONVERSATIONAL_STATE_FILENAME + ) + + def _load(self) -> list[str]: + path = self._state_path() + if not path.exists(): + return [] + try: + data = json.loads(path.read_text(encoding="utf-8")) + if isinstance(data, list): + return [str(message_id) for message_id in data if str(message_id).strip()] + except Exception: + logger.debug("[%s] Failed to load non-conversational Discord IDs", "Discord") + return [] + + def _save(self) -> None: + ids = list(self._ids) + if len(ids) > self._max_tracked: + ids = ids[-self._max_tracked:] + self._ids = dict.fromkeys(ids) + try: + atomic_json_write(self._state_path(), ids, indent=None) + except Exception: + logger.debug("[%s] Failed to save non-conversational Discord IDs", "Discord", exc_info=True) + + def mark_many(self, message_ids: List[str]) -> None: + changed = False + for message_id in message_ids: + key = str(message_id or "").strip() + if key and key not in self._ids: + self._ids[key] = None + changed = True + if changed: + self._save() + + def __contains__(self, message_id: str) -> bool: + return str(message_id or "") in self._ids + + +def _metadata_marks_nonconversational(metadata: Optional[Dict[str, Any]]) -> bool: + """Return True when an outbound send was explicitly marked as status-only.""" + if not isinstance(metadata, dict): + return False + return any(bool(metadata.get(key)) for key in _DISCORD_NONCONVERSATIONAL_METADATA_KEYS) + + +def _looks_like_nonconversational_history_message(content: str) -> bool: + """Fallback recognizer for legacy status bumps missing persisted IDs.""" + text = content or "" + return any(pattern.match(text) for pattern in _DISCORD_NONCONVERSATIONAL_HISTORY_MESSAGE_PATTERNS) + + def _clean_discord_id(entry: str) -> str: """Strip common prefixes from a Discord user ID or username entry. @@ -681,6 +780,9 @@ class DiscordAdapter(BasePlatformAdapter): # history backfill to skip the full scan on hot paths. Falls back to # scanning channel.history() on cache miss (cold start / restart). self._last_self_message_id: Dict[str, str] = {} + # Persistent set of bot-authored lifecycle/status message IDs that + # should not act as conversational history boundaries after restart. + self._nonconversational_messages = _DiscordNonConversationalMessageTracker() def _handle_bot_task_done(self, task: asyncio.Task) -> None: """Surface post-startup discord.py task exits to the gateway supervisor. @@ -1577,6 +1679,7 @@ class DiscordAdapter(BasePlatformAdapter): thread_id = None if metadata and metadata.get("thread_id"): thread_id = metadata["thread_id"] + nonconversational = _metadata_marks_nonconversational(metadata) if thread_id: # Fetch the thread directly — threads are addressed by their own ID. @@ -1654,7 +1757,10 @@ class DiscordAdapter(BasePlatformAdapter): # backfill — avoids a full channel.history() scan on hot paths. if message_ids: _target_id = thread_id or chat_id - self._last_self_message_id[_target_id] = message_ids[-1] + if nonconversational: + self._nonconversational_messages.mark_many(message_ids) + elif not _looks_like_nonconversational_history_message(content): + self._last_self_message_id[_target_id] = message_ids[-1] return SendResult( success=True, @@ -4203,23 +4309,29 @@ class DiscordAdapter(BasePlatformAdapter): after=_after_obj, oldest_first=False, ): + # Skip system messages (pins, joins, thread renames, etc.) + if msg.type not in {discord.MessageType.default, discord.MessageType.reply}: + continue + + content = getattr(msg, "clean_content", msg.content) or "" + if ( + str(getattr(msg, "id", "")) in self._nonconversational_messages + or _looks_like_nonconversational_history_message(content) + ): + continue + # Stop at our own message — this is the partition point. # Everything before this is already in the session transcript. # (Redundant when _after_obj is set, but needed for cold start.) if msg.author == self._client.user: break - # Skip system messages (pins, joins, thread renames, etc.) - if msg.type not in {discord.MessageType.default, discord.MessageType.reply}: - continue - # Respect DISCORD_ALLOW_BOTS for other bots. # For history context, "mentions" is treated as "all" — we are # deciding what context to show, not whether to respond. if getattr(msg.author, "bot", False) and not include_other_bots: continue - content = getattr(msg, "clean_content", msg.content) or "" if not content and msg.attachments: content = "(attachment)" if not content: @@ -4693,6 +4805,8 @@ class DiscordAdapter(BasePlatformAdapter): ) msg = await channel.send(embed=embed, view=view) view._message = msg # store for on_timeout expiration editing + if _metadata_marks_nonconversational(metadata): + self._nonconversational_messages.mark_many([str(msg.id)]) return SendResult(success=True, message_id=str(msg.id)) except Exception as e: return SendResult(success=False, error=str(e)) diff --git a/tests/gateway/test_discord_free_response.py b/tests/gateway/test_discord_free_response.py index e2133d56c35..39556f6603f 100644 --- a/tests/gateway/test_discord_free_response.py +++ b/tests/gateway/test_discord_free_response.py @@ -666,6 +666,70 @@ async def test_fetch_channel_context_stops_at_self_message_and_reverses_to_chron ) +@pytest.mark.asyncio +async def test_fetch_channel_context_skips_self_improvement_boundary_message(adapter, monkeypatch): + """Delayed harness status bumps must not hide messages after the real reply.""" + monkeypatch.setenv("DISCORD_ALLOW_BOTS", "all") + adapter.config.extra["history_backfill_limit"] = 10 + + codex = SimpleNamespace(id=55, display_name="Codex", name="Codex", bot=True) + human = SimpleNamespace(id=56, display_name="Alice", name="Alice", bot=False) + + channel = FakeHistoryChannel( + [ + make_history_message( + author=adapter._client.user, + content="arbitrary lifecycle text from a metadata-marked send", + msg_id=9, + ), + make_history_message( + author=adapter._client.user, + content="[Background process bg-123 finished with exit code 0~ Here's the final output:\nok]", + msg_id=8, + ), + make_history_message( + author=codex, + content="♻ Gateway restarted successfully. Your session continues.", + msg_id=7, + ), + make_history_message( + author=codex, + content="💾 Self-improvement review: Memory updated", + msg_id=6, + ), + make_history_message(author=human, content="question after reply", msg_id=5), + make_history_message( + author=adapter._client.user, + content="💾 Self-improvement review: Skill 'hermes-gateway-display-config' patched", + msg_id=4, + ), + make_history_message(author=codex, content="Codex final answer", msg_id=3), + make_history_message(author=human, content="prompt before reply", msg_id=2), + make_history_message(author=adapter._client.user, content="our prior response", msg_id=1), + ], + channel_id=123, + ) + adapter._nonconversational_messages.mark_many(["9"]) + + result = await adapter._fetch_channel_context(channel, before=make_message(channel=channel, content="trigger")) + + assert result == ( + "[Recent channel messages]\n" + "[Alice] prompt before reply\n" + "[Codex [bot]] Codex final answer\n" + "[Alice] question after reply" + ) + + +def test_nonconversational_fallback_requires_self_improvement_emoji(): + assert discord_platform._looks_like_nonconversational_history_message( + "💾 Self-improvement review: Memory updated" + ) + assert not discord_platform._looks_like_nonconversational_history_message( + "Self-improvement review: this is a normal assistant heading" + ) + + @pytest.mark.asyncio async def test_fetch_channel_context_skips_other_bots_when_allow_bots_none(adapter, monkeypatch): monkeypatch.setenv("DISCORD_ALLOW_BOTS", "none") @@ -801,6 +865,33 @@ async def test_fetch_channel_context_ignores_stale_cache(adapter, monkeypatch): assert recorded_after["value"] is None +@pytest.mark.asyncio +async def test_discord_send_does_not_cache_nonconversational_status_as_history_boundary(adapter): + """Automated status notifications should not move the backfill boundary.""" + + class SendingChannel(FakeTextChannel): + async def send(self, content, reference=None): + return SimpleNamespace(id=222) + + channel = SendingChannel(channel_id=777) + adapter._client = SimpleNamespace( + user=adapter._client.user, + get_channel=lambda channel_id: channel if channel_id == 777 else None, + fetch_channel=AsyncMock(return_value=channel), + ) + adapter._last_self_message_id["777"] = "111" + + result = await adapter.send( + "777", + "arbitrary lifecycle text from gateway", + metadata={"non_conversational": True}, + ) + + assert result.success is True + assert adapter._last_self_message_id["777"] == "111" + assert "222" in adapter._nonconversational_messages + + @pytest.mark.asyncio async def test_discord_shared_channel_backfill_prepends_context(adapter, monkeypatch): monkeypatch.setenv("DISCORD_REQUIRE_MENTION", "true") @@ -925,5 +1016,3 @@ async def test_discord_auto_thread_skips_backfill(adapter, monkeypatch): adapter._auto_create_thread.assert_awaited_once() adapter._fetch_channel_context.assert_not_awaited() - -