fix(gateway): don't let delayed Discord status messages partition history backfill

Discord channel-history backfill partitions on Hermes' last self-authored
message. Asynchronous, non-conversational status sends (self-improvement
review bubbles, heartbeats, background-process notifications, update status,
gateway restart/online notices) land as ordinary bot messages, so a delayed
status bump becomes the history boundary and swallows real messages that
arrived after Hermes' actual reply.

Mark these sends at the source via metadata["non_conversational"] (Discord
only; other platforms' metadata is unchanged). The adapter no longer advances
the history-boundary cache for marked sends and persists their IDs to a
sidecar JSON so the cold-start scan can skip them by ID after a restart. A
narrow regex recognizer remains only as an upgrade bridge for status bumps
emitted by an older gateway that pre-dates the marking.
This commit is contained in:
snav 2026-06-10 19:54:18 -04:00 committed by Teknium
parent b936f92b25
commit caaa916289
3 changed files with 268 additions and 23 deletions

View file

@ -195,6 +195,19 @@ def _gateway_platform_value(platform: Any) -> str:
return str(getattr(platform, "value", platform) or "").strip().lower()
def _non_conversational_metadata(
metadata: Optional[Dict[str, Any]] = None,
*,
platform: Any = None,
) -> Optional[Dict[str, Any]]:
"""Mark Discord lifecycle/status sends without changing other platforms."""
if _gateway_platform_value(platform) != "discord":
return metadata
merged = dict(metadata or {})
merged["non_conversational"] = True
return merged
def _is_transient_network_error(exc: BaseException) -> bool:
"""Return True for transient network errors safe to log + swallow.
@ -11746,7 +11759,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
chunks = [clean[i:i + max_chunk] for i in range(0, len(clean), max_chunk)]
for chunk in chunks:
try:
await adapter.send(chat_id, f"```\n{chunk}\n```", metadata=metadata)
await adapter.send(
chat_id,
f"```\n{chunk}\n```",
metadata=_non_conversational_metadata(metadata, platform=platform),
)
except Exception as e:
logger.debug("Update stream send failed: %s", e)
@ -11769,12 +11786,16 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
exit_code_raw = exit_code_path.read_text().strip() or "1"
exit_code = int(exit_code_raw)
if exit_code == 0:
await adapter.send(chat_id, "✅ Hermes update finished.", metadata=metadata)
await adapter.send(
chat_id,
"✅ Hermes update finished.",
metadata=_non_conversational_metadata(metadata, platform=platform),
)
else:
await adapter.send(
chat_id,
"❌ Hermes update failed (exit code {}).".format(exit_code),
metadata=metadata,
metadata=_non_conversational_metadata(metadata, platform=platform),
)
logger.info("Update finished (exit=%s), notified %s", exit_code, session_key)
except Exception as e:
@ -11825,7 +11846,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
prompt=prompt_text,
default=default,
session_key=session_key,
metadata=metadata,
metadata=_non_conversational_metadata(metadata, platform=platform),
)
sent_buttons = True
except Exception as btn_err:
@ -11839,7 +11860,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
f"{prompt_text}{default_hint}\n\n"
f"Reply `{_p}approve` (yes) or `{_p}deny` (no), "
f"or type your answer directly.",
metadata=metadata,
metadata=_non_conversational_metadata(metadata, platform=platform),
)
# Keep the prompt marker on disk until the user
# answers. If the gateway restarts mid-prompt, the
@ -11863,7 +11884,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
await adapter.send(
chat_id,
"❌ Hermes update timed out after 30 minutes.",
metadata=metadata,
metadata=_non_conversational_metadata(metadata, platform=platform),
)
except Exception:
pass
@ -11969,7 +11990,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
msg = "✅ Hermes update finished successfully."
else:
msg = "❌ Hermes update failed. Check the gateway logs or run `hermes update` manually for details."
await adapter.send(chat_id, msg, metadata=metadata)
await adapter.send(
chat_id,
msg,
metadata=_non_conversational_metadata(metadata, platform=platform),
)
logger.info(
"Sent post-update notification to %s:%s (exit=%s)",
platform_str,
@ -12032,7 +12057,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
result = await adapter.send(
str(chat_id),
"♻ Gateway restarted successfully. Your session continues.",
metadata=metadata,
metadata=_non_conversational_metadata(metadata, platform=platform),
)
# adapter.send() catches provider errors (e.g. "Chat not found")
# and returns SendResult(success=False) rather than raising, so
@ -12099,9 +12124,17 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
adapter=adapter,
)
if metadata:
result = await adapter.send(str(home.chat_id), message, metadata=metadata)
result = await adapter.send(
str(home.chat_id),
message,
metadata=_non_conversational_metadata(metadata, platform=platform),
)
else:
result = await adapter.send(str(home.chat_id), message)
result = await adapter.send(
str(home.chat_id),
message,
metadata=_non_conversational_metadata(platform=platform),
)
if result is not None and getattr(result, "success", True) is False:
logger.warning(
"Home-channel startup notification failed for %s:%s: %s",
@ -12742,7 +12775,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
if adapter and chat_id:
try:
send_meta = {"thread_id": thread_id} if thread_id else None
await adapter.send(chat_id, message_text, metadata=send_meta)
await adapter.send(
chat_id,
message_text,
metadata=_non_conversational_metadata(send_meta, platform=platform_name),
)
except Exception as e:
logger.error("Watcher delivery error: %s", e)
break
@ -12763,7 +12800,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
if adapter and chat_id:
try:
send_meta = {"thread_id": thread_id} if thread_id else None
await adapter.send(chat_id, message_text, metadata=send_meta)
await adapter.send(
chat_id,
message_text,
metadata=_non_conversational_metadata(send_meta, platform=platform_name),
)
except Exception as e:
logger.error("Watcher delivery error: %s", e)
@ -14144,6 +14185,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
if _progress_thread_id == source.thread_id
else {"thread_id": _progress_thread_id}
) if _progress_thread_id else None
_progress_metadata = _non_conversational_metadata(_progress_metadata, platform=source.platform)
_progress_reply_to = (
event_message_id
if source.platform in (Platform.FEISHU, Platform.MATTERMOST) and source.thread_id and event_message_id
@ -14906,7 +14948,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
_status_adapter.send(
_status_chat_id,
message,
metadata=_status_thread_metadata,
metadata=_non_conversational_metadata(_status_thread_metadata, platform=source.platform),
),
_loop_for_step,
logger=logger,
@ -15748,7 +15790,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
_notify_res = await _notify_adapter.send(
source.chat_id,
_heartbeat_text,
metadata=_status_thread_metadata,
metadata=_non_conversational_metadata(_status_thread_metadata, platform=source.platform),
)
if getattr(_notify_res, "success", False) and getattr(
_notify_res, "message_id", None

View file

@ -14,6 +14,7 @@ import hashlib
import json
import logging
import os
import re
import struct
import subprocess
import tempfile
@ -29,6 +30,7 @@ VALID_THREAD_AUTO_ARCHIVE_MINUTES = {60, 1440, 4320, 10080}
_DISCORD_COMMAND_SYNC_POLICIES = {"safe", "bulk", "off"}
_DISCORD_COMMAND_SYNC_STATE_SUBDIR = "gateway"
_DISCORD_COMMAND_SYNC_STATE_FILENAME = "discord_command_sync_state.json"
_DISCORD_NONCONVERSATIONAL_STATE_FILENAME = "discord_nonconversational_messages.json"
_DISCORD_COMMAND_SYNC_MUTATION_INTERVAL_SECONDS = 4.5
_DISCORD_COMMAND_SYNC_MAX_RATE_LIMIT_SLEEP_SECONDS = 30.0
# Discord enforces a hard cap of 100 global application (slash) commands per
@ -37,6 +39,37 @@ _DISCORD_COMMAND_SYNC_MAX_RATE_LIMIT_SLEEP_SECONDS = 30.0
# every slash command — not just the overflow ones. We keep the desired set
# at or below this limit at registration time.
_DISCORD_MAX_APP_COMMANDS = 100
_DISCORD_NONCONVERSATIONAL_METADATA_KEYS = frozenset({
"non_conversational",
"non_conversational_history",
})
# Upgrade-bridge fallback only. The primary mechanism is the persisted
# non-conversational message-ID set populated from explicitly marked sends
# (metadata["non_conversational"]). These regexes exist solely to recognize
# status bumps emitted by an older gateway version that pre-dates the marking,
# so they don't partition history after an upgrade. New emitters should set the
# metadata flag, not rely on a regex here.
_DISCORD_NONCONVERSATIONAL_HISTORY_MESSAGE_PATTERNS = (
re.compile(r"^\s*💾\s*Self-improvement review:\s+\S[\s\S]*$", re.IGNORECASE),
# Legacy/background-review test doubles used this shorter form before the
# self-improvement prefix became the stable emitter contract.
re.compile(
r"^\s*💾\s+Skill\s+['\"].+?['\"]\s+(?:created|updated|improved|patched)\.?\s*$",
re.IGNORECASE,
),
re.compile(r"^\s*⏳\s+Working\s+—\s+\d+\s+min(?:\s|$)", re.IGNORECASE),
re.compile(
r"^\s*\[Background process\s+\S+\s+"
r"(?:finished with exit code|is still running~)[\s\S]*\]\s*$",
re.IGNORECASE,
),
re.compile(
r"^\s*(?:✅|❌)\s+Hermes update\s+"
r"(?:finished|failed|timed out)[\s\S]*$",
re.IGNORECASE,
),
re.compile(r"^\s*♻️?\s+Gateway\s+(?:restarted successfully|online\b)[\s\S]*$", re.IGNORECASE),
)
try:
import discord
@ -55,7 +88,6 @@ from pathlib import Path as _Path
sys.path.insert(0, str(_Path(__file__).resolve().parents[2]))
from gateway.config import Platform, PlatformConfig
import re
from gateway.platforms.helpers import MessageDeduplicator, ThreadParticipationTracker
from utils import atomic_json_write
@ -132,6 +164,73 @@ def _find_discord_windows_bundled_opus(discord_module: Any = None) -> Optional[s
return None
class _DiscordNonConversationalMessageTracker:
"""Persistent bounded set of Discord message IDs that are status noise."""
_MAX_TRACKED = 2000
def __init__(self, max_tracked: int = _MAX_TRACKED):
self._max_tracked = max_tracked
self._ids: dict[str, None] = dict.fromkeys(self._load())
def _state_path(self) -> _Path:
from hermes_constants import get_hermes_home
return (
get_hermes_home()
/ _DISCORD_COMMAND_SYNC_STATE_SUBDIR
/ _DISCORD_NONCONVERSATIONAL_STATE_FILENAME
)
def _load(self) -> list[str]:
path = self._state_path()
if not path.exists():
return []
try:
data = json.loads(path.read_text(encoding="utf-8"))
if isinstance(data, list):
return [str(message_id) for message_id in data if str(message_id).strip()]
except Exception:
logger.debug("[%s] Failed to load non-conversational Discord IDs", "Discord")
return []
def _save(self) -> None:
ids = list(self._ids)
if len(ids) > self._max_tracked:
ids = ids[-self._max_tracked:]
self._ids = dict.fromkeys(ids)
try:
atomic_json_write(self._state_path(), ids, indent=None)
except Exception:
logger.debug("[%s] Failed to save non-conversational Discord IDs", "Discord", exc_info=True)
def mark_many(self, message_ids: List[str]) -> None:
changed = False
for message_id in message_ids:
key = str(message_id or "").strip()
if key and key not in self._ids:
self._ids[key] = None
changed = True
if changed:
self._save()
def __contains__(self, message_id: str) -> bool:
return str(message_id or "") in self._ids
def _metadata_marks_nonconversational(metadata: Optional[Dict[str, Any]]) -> bool:
"""Return True when an outbound send was explicitly marked as status-only."""
if not isinstance(metadata, dict):
return False
return any(bool(metadata.get(key)) for key in _DISCORD_NONCONVERSATIONAL_METADATA_KEYS)
def _looks_like_nonconversational_history_message(content: str) -> bool:
"""Fallback recognizer for legacy status bumps missing persisted IDs."""
text = content or ""
return any(pattern.match(text) for pattern in _DISCORD_NONCONVERSATIONAL_HISTORY_MESSAGE_PATTERNS)
def _clean_discord_id(entry: str) -> str:
"""Strip common prefixes from a Discord user ID or username entry.
@ -681,6 +780,9 @@ class DiscordAdapter(BasePlatformAdapter):
# history backfill to skip the full scan on hot paths. Falls back to
# scanning channel.history() on cache miss (cold start / restart).
self._last_self_message_id: Dict[str, str] = {}
# Persistent set of bot-authored lifecycle/status message IDs that
# should not act as conversational history boundaries after restart.
self._nonconversational_messages = _DiscordNonConversationalMessageTracker()
def _handle_bot_task_done(self, task: asyncio.Task) -> None:
"""Surface post-startup discord.py task exits to the gateway supervisor.
@ -1577,6 +1679,7 @@ class DiscordAdapter(BasePlatformAdapter):
thread_id = None
if metadata and metadata.get("thread_id"):
thread_id = metadata["thread_id"]
nonconversational = _metadata_marks_nonconversational(metadata)
if thread_id:
# Fetch the thread directly — threads are addressed by their own ID.
@ -1654,7 +1757,10 @@ class DiscordAdapter(BasePlatformAdapter):
# backfill — avoids a full channel.history() scan on hot paths.
if message_ids:
_target_id = thread_id or chat_id
self._last_self_message_id[_target_id] = message_ids[-1]
if nonconversational:
self._nonconversational_messages.mark_many(message_ids)
elif not _looks_like_nonconversational_history_message(content):
self._last_self_message_id[_target_id] = message_ids[-1]
return SendResult(
success=True,
@ -4203,23 +4309,29 @@ class DiscordAdapter(BasePlatformAdapter):
after=_after_obj,
oldest_first=False,
):
# Skip system messages (pins, joins, thread renames, etc.)
if msg.type not in {discord.MessageType.default, discord.MessageType.reply}:
continue
content = getattr(msg, "clean_content", msg.content) or ""
if (
str(getattr(msg, "id", "")) in self._nonconversational_messages
or _looks_like_nonconversational_history_message(content)
):
continue
# Stop at our own message — this is the partition point.
# Everything before this is already in the session transcript.
# (Redundant when _after_obj is set, but needed for cold start.)
if msg.author == self._client.user:
break
# Skip system messages (pins, joins, thread renames, etc.)
if msg.type not in {discord.MessageType.default, discord.MessageType.reply}:
continue
# Respect DISCORD_ALLOW_BOTS for other bots.
# For history context, "mentions" is treated as "all" — we are
# deciding what context to show, not whether to respond.
if getattr(msg.author, "bot", False) and not include_other_bots:
continue
content = getattr(msg, "clean_content", msg.content) or ""
if not content and msg.attachments:
content = "(attachment)"
if not content:
@ -4693,6 +4805,8 @@ class DiscordAdapter(BasePlatformAdapter):
)
msg = await channel.send(embed=embed, view=view)
view._message = msg # store for on_timeout expiration editing
if _metadata_marks_nonconversational(metadata):
self._nonconversational_messages.mark_many([str(msg.id)])
return SendResult(success=True, message_id=str(msg.id))
except Exception as e:
return SendResult(success=False, error=str(e))

View file

@ -666,6 +666,70 @@ async def test_fetch_channel_context_stops_at_self_message_and_reverses_to_chron
)
@pytest.mark.asyncio
async def test_fetch_channel_context_skips_self_improvement_boundary_message(adapter, monkeypatch):
"""Delayed harness status bumps must not hide messages after the real reply."""
monkeypatch.setenv("DISCORD_ALLOW_BOTS", "all")
adapter.config.extra["history_backfill_limit"] = 10
codex = SimpleNamespace(id=55, display_name="Codex", name="Codex", bot=True)
human = SimpleNamespace(id=56, display_name="Alice", name="Alice", bot=False)
channel = FakeHistoryChannel(
[
make_history_message(
author=adapter._client.user,
content="arbitrary lifecycle text from a metadata-marked send",
msg_id=9,
),
make_history_message(
author=adapter._client.user,
content="[Background process bg-123 finished with exit code 0~ Here's the final output:\nok]",
msg_id=8,
),
make_history_message(
author=codex,
content="♻ Gateway restarted successfully. Your session continues.",
msg_id=7,
),
make_history_message(
author=codex,
content="💾 Self-improvement review: Memory updated",
msg_id=6,
),
make_history_message(author=human, content="question after reply", msg_id=5),
make_history_message(
author=adapter._client.user,
content="💾 Self-improvement review: Skill 'hermes-gateway-display-config' patched",
msg_id=4,
),
make_history_message(author=codex, content="Codex final answer", msg_id=3),
make_history_message(author=human, content="prompt before reply", msg_id=2),
make_history_message(author=adapter._client.user, content="our prior response", msg_id=1),
],
channel_id=123,
)
adapter._nonconversational_messages.mark_many(["9"])
result = await adapter._fetch_channel_context(channel, before=make_message(channel=channel, content="trigger"))
assert result == (
"[Recent channel messages]\n"
"[Alice] prompt before reply\n"
"[Codex [bot]] Codex final answer\n"
"[Alice] question after reply"
)
def test_nonconversational_fallback_requires_self_improvement_emoji():
assert discord_platform._looks_like_nonconversational_history_message(
"💾 Self-improvement review: Memory updated"
)
assert not discord_platform._looks_like_nonconversational_history_message(
"Self-improvement review: this is a normal assistant heading"
)
@pytest.mark.asyncio
async def test_fetch_channel_context_skips_other_bots_when_allow_bots_none(adapter, monkeypatch):
monkeypatch.setenv("DISCORD_ALLOW_BOTS", "none")
@ -801,6 +865,33 @@ async def test_fetch_channel_context_ignores_stale_cache(adapter, monkeypatch):
assert recorded_after["value"] is None
@pytest.mark.asyncio
async def test_discord_send_does_not_cache_nonconversational_status_as_history_boundary(adapter):
"""Automated status notifications should not move the backfill boundary."""
class SendingChannel(FakeTextChannel):
async def send(self, content, reference=None):
return SimpleNamespace(id=222)
channel = SendingChannel(channel_id=777)
adapter._client = SimpleNamespace(
user=adapter._client.user,
get_channel=lambda channel_id: channel if channel_id == 777 else None,
fetch_channel=AsyncMock(return_value=channel),
)
adapter._last_self_message_id["777"] = "111"
result = await adapter.send(
"777",
"arbitrary lifecycle text from gateway",
metadata={"non_conversational": True},
)
assert result.success is True
assert adapter._last_self_message_id["777"] == "111"
assert "222" in adapter._nonconversational_messages
@pytest.mark.asyncio
async def test_discord_shared_channel_backfill_prepends_context(adapter, monkeypatch):
monkeypatch.setenv("DISCORD_REQUIRE_MENTION", "true")
@ -925,5 +1016,3 @@ async def test_discord_auto_thread_skips_backfill(adapter, monkeypatch):
adapter._auto_create_thread.assert_awaited_once()
adapter._fetch_channel_context.assert_not_awaited()