fix(gateway): dedupe user turns on transient failure (#47237)

When the gateway persists a user message after a transient provider
failure (429/timeout/auth error), subsequent retries of the same
Telegram message could stack duplicate user turns in the transcript,
causing the agent to fall behind by 1-2 messages.

Add has_platform_message_id() to SessionDB (using the existing
idx_messages_platform_msg_id partial index) and a SessionStore wrapper.
The gateway's transient-failure path checks this before
append_to_transcript -- if the platform_message_id is already
persisted, the duplicate write is skipped.

Salvaged from #47869 by @davidgut1982. Adapted to current main which
has additional append sites and an existing content-based dedupe in
the exception handler path.

Closes #47237
This commit is contained in:
davidgut1982 2026-06-25 12:09:17 +05:30 committed by kshitijk4poor
parent d6cf383d74
commit 6208d6b3be
5 changed files with 167 additions and 4 deletions

View file

@ -10209,11 +10209,27 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
}
if event.message_id:
_user_entry["message_id"] = str(event.message_id)
self.session_store.append_to_transcript(
session_entry.session_id,
_user_entry,
skip_db=agent_persisted,
# Dedupe: skip if this platform message_id is already in the
# transcript (prevents duplicate user turns on Telegram retries
# after transient failures). #47237
_skip_persist = (
event.message_id
and self.session_store.has_platform_message_id(
session_entry.session_id, str(event.message_id)
)
)
if _skip_persist:
logger.info(
"Skipping duplicate user turn "
"(message_id=%s) in session %s",
event.message_id, session_entry.session_id,
)
else:
self.session_store.append_to_transcript(
session_entry.session_id,
_user_entry,
skip_db=agent_persisted,
)
else:
history_len = agent_result.get("history_offset", len(history))
new_messages = agent_messages[history_len:] if len(agent_messages) > history_len else []

View file

@ -1460,6 +1460,25 @@ class SessionStore:
except Exception as e:
logger.debug("Session DB operation failed: %s", e)
def has_platform_message_id(
self, session_id: str, platform_message_id: str
) -> bool:
"""Check if a message with the given platform_message_id is persisted.
Thin wrapper over SessionDB.has_platform_message_id(). Returns False
when no DB is available (in-memory sessions). Used by the gateway's
transient-failure dedupe guard (#47237).
"""
if not self._db:
return False
try:
return self._db.has_platform_message_id(
session_id, platform_message_id
)
except Exception:
logger.debug("has_platform_message_id lookup failed", exc_info=True)
return False
def rewrite_transcript(self, session_id: str, messages: List[Dict[str, Any]]) -> None:
"""Replace the entire transcript for a session with new messages.

View file

@ -3935,6 +3935,24 @@ class SessionDB:
cursor = self._conn.execute("SELECT COUNT(*) FROM messages")
return cursor.fetchone()[0]
def has_platform_message_id(
self, session_id: str, platform_message_id: str
) -> bool:
"""Check if a message with the given platform_message_id exists.
Uses the idx_messages_platform_msg_id partial index for efficient
lookup. Used by the gateway's transient-failure dedupe guard (#47237)
to skip re-persisting a user message that was already saved on a
prior retry of the same inbound platform message.
"""
with self._lock:
cursor = self._conn.execute(
"SELECT 1 FROM messages "
"WHERE session_id = ? AND platform_message_id = ? LIMIT 1",
(session_id, platform_message_id),
)
return cursor.fetchone() is not None
# =========================================================================
# Export and cleanup
# =========================================================================

View file

@ -65,6 +65,9 @@ def _bootstrap(monkeypatch, tmp_path):
)
runner.session_store.load_transcript.return_value = []
runner.session_store.append_to_transcript = MagicMock()
# Mock has_platform_message_id to return False so the dedupe guard
# (#47237) in gateway/run.py does not skip the append_to_transcript call.
runner.session_store.has_platform_message_id.return_value = False
runner.session_store.update_session = MagicMock()
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)

View file

@ -0,0 +1,107 @@
"""Regression tests for issue #47237.
When the gateway persists a user message after a transient provider
failure (429/timeout/auth error), subsequent retries of the same
Telegram message must not stack duplicate user turns in the transcript.
The dedupe guard checks has_platform_message_id before persisting.
"""
from gateway.session import SessionStore
from hermes_state import SessionDB
class TestHasPlatformMessageId:
"""SessionDB.has_platform_message_id and SessionStore wrapper."""
def _make_db(self, tmp_path):
db = SessionDB(tmp_path / "state.db")
db.create_session("s1", "cli")
return db
def test_returns_false_when_not_present(self, tmp_path):
db = self._make_db(tmp_path)
assert not db.has_platform_message_id("s1", "msg-999")
def test_returns_true_after_append(self, tmp_path):
db = self._make_db(tmp_path)
db.append_message(
session_id="s1",
role="user",
content="hello",
platform_message_id="msg-123",
)
assert db.has_platform_message_id("s1", "msg-123")
def test_returns_false_for_different_session(self, tmp_path):
db = self._make_db(tmp_path)
db.create_session("s2", "cli")
db.append_message(
session_id="s1",
role="user",
content="hello",
platform_message_id="msg-123",
)
assert not db.has_platform_message_id("s2", "msg-123")
def test_session_store_wrapper_returns_false_without_db(self, tmp_path):
store = SessionStore.__new__(SessionStore)
store._db = None
assert not store.has_platform_message_id("s1", "msg-123")
def test_session_store_wrapper_proxies_to_db(self, tmp_path):
db = self._make_db(tmp_path)
db.append_message(
session_id="s1",
role="user",
content="hello",
platform_message_id="msg-456",
)
store = SessionStore.__new__(SessionStore)
store._db = db
assert store.has_platform_message_id("s1", "msg-456")
assert not store.has_platform_message_id("s1", "msg-000")
class TestDedupeOnTransientFailure:
"""The gateway's transient-failure path must not persist duplicates."""
@staticmethod
def _make_db(tmp_path):
db = SessionDB(tmp_path / "state.db")
db.create_session("s1", "cli")
return db
def test_duplicate_message_id_skipped(self, tmp_path):
"""When has_platform_message_id returns True, the append is skipped."""
db = self._make_db(tmp_path)
db.append_message(
session_id="s1",
role="user",
content="hello",
platform_message_id="msg-789",
)
store = SessionStore.__new__(SessionStore)
store._db = db
# Simulate a second attempt to persist the same message
assert store.has_platform_message_id("s1", "msg-789")
# The gateway code checks this before calling append_to_transcript,
# so the second append should never fire.
def test_different_message_id_persists(self, tmp_path):
"""A new message_id should always be persisted."""
db = self._make_db(tmp_path)
db.append_message(
session_id="s1",
role="user",
content="first",
platform_message_id="msg-001",
)
assert not db.has_platform_message_id("s1", "msg-002")
db.append_message(
session_id="s1",
role="user",
content="second",
platform_message_id="msg-002",
)
assert db.has_platform_message_id("s1", "msg-002")