From 6208d6b3be49fe320dbf2a40317e0665038dd60c Mon Sep 17 00:00:00 2001 From: davidgut1982 Date: Thu, 25 Jun 2026 12:09:17 +0530 Subject: [PATCH] fix(gateway): dedupe user turns on transient failure (#47237) When the gateway persists a user message after a transient provider failure (429/timeout/auth error), subsequent retries of the same Telegram message could stack duplicate user turns in the transcript, causing the agent to fall behind by 1-2 messages. Add has_platform_message_id() to SessionDB (using the existing idx_messages_platform_msg_id partial index) and a SessionStore wrapper. The gateway's transient-failure path checks this before append_to_transcript -- if the platform_message_id is already persisted, the duplicate write is skipped. Salvaged from #47869 by @davidgut1982. Adapted to current main which has additional append sites and an existing content-based dedupe in the exception handler path. Closes #47237 --- gateway/run.py | 24 +++- gateway/session.py | 19 ++++ hermes_state.py | 18 +++ .../test_42039_duplicate_user_message.py | 3 + tests/gateway/test_dedupe_user_turns.py | 107 ++++++++++++++++++ 5 files changed, 167 insertions(+), 4 deletions(-) create mode 100644 tests/gateway/test_dedupe_user_turns.py diff --git a/gateway/run.py b/gateway/run.py index bcee403b1ce..db3240c1efe 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -10209,11 +10209,27 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew } if event.message_id: _user_entry["message_id"] = str(event.message_id) - self.session_store.append_to_transcript( - session_entry.session_id, - _user_entry, - skip_db=agent_persisted, + # Dedupe: skip if this platform message_id is already in the + # transcript (prevents duplicate user turns on Telegram retries + # after transient failures). #47237 + _skip_persist = ( + event.message_id + and self.session_store.has_platform_message_id( + session_entry.session_id, str(event.message_id) + ) ) + if _skip_persist: + logger.info( + "Skipping duplicate user turn " + "(message_id=%s) in session %s", + event.message_id, session_entry.session_id, + ) + else: + self.session_store.append_to_transcript( + session_entry.session_id, + _user_entry, + skip_db=agent_persisted, + ) else: history_len = agent_result.get("history_offset", len(history)) new_messages = agent_messages[history_len:] if len(agent_messages) > history_len else [] diff --git a/gateway/session.py b/gateway/session.py index 8f59d54951e..f79e371d804 100644 --- a/gateway/session.py +++ b/gateway/session.py @@ -1460,6 +1460,25 @@ class SessionStore: except Exception as e: logger.debug("Session DB operation failed: %s", e) + def has_platform_message_id( + self, session_id: str, platform_message_id: str + ) -> bool: + """Check if a message with the given platform_message_id is persisted. + + Thin wrapper over SessionDB.has_platform_message_id(). Returns False + when no DB is available (in-memory sessions). Used by the gateway's + transient-failure dedupe guard (#47237). + """ + if not self._db: + return False + try: + return self._db.has_platform_message_id( + session_id, platform_message_id + ) + except Exception: + logger.debug("has_platform_message_id lookup failed", exc_info=True) + return False + def rewrite_transcript(self, session_id: str, messages: List[Dict[str, Any]]) -> None: """Replace the entire transcript for a session with new messages. diff --git a/hermes_state.py b/hermes_state.py index cfb63bd165b..0c77d9c21dd 100644 --- a/hermes_state.py +++ b/hermes_state.py @@ -3935,6 +3935,24 @@ class SessionDB: cursor = self._conn.execute("SELECT COUNT(*) FROM messages") return cursor.fetchone()[0] + def has_platform_message_id( + self, session_id: str, platform_message_id: str + ) -> bool: + """Check if a message with the given platform_message_id exists. + + Uses the idx_messages_platform_msg_id partial index for efficient + lookup. Used by the gateway's transient-failure dedupe guard (#47237) + to skip re-persisting a user message that was already saved on a + prior retry of the same inbound platform message. + """ + with self._lock: + cursor = self._conn.execute( + "SELECT 1 FROM messages " + "WHERE session_id = ? AND platform_message_id = ? LIMIT 1", + (session_id, platform_message_id), + ) + return cursor.fetchone() is not None + # ========================================================================= # Export and cleanup # ========================================================================= diff --git a/tests/gateway/test_42039_duplicate_user_message.py b/tests/gateway/test_42039_duplicate_user_message.py index 0f39c74afc0..88f8b8961e9 100644 --- a/tests/gateway/test_42039_duplicate_user_message.py +++ b/tests/gateway/test_42039_duplicate_user_message.py @@ -65,6 +65,9 @@ def _bootstrap(monkeypatch, tmp_path): ) runner.session_store.load_transcript.return_value = [] runner.session_store.append_to_transcript = MagicMock() + # Mock has_platform_message_id to return False so the dedupe guard + # (#47237) in gateway/run.py does not skip the append_to_transcript call. + runner.session_store.has_platform_message_id.return_value = False runner.session_store.update_session = MagicMock() monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) diff --git a/tests/gateway/test_dedupe_user_turns.py b/tests/gateway/test_dedupe_user_turns.py new file mode 100644 index 00000000000..17f66e504be --- /dev/null +++ b/tests/gateway/test_dedupe_user_turns.py @@ -0,0 +1,107 @@ +"""Regression tests for issue #47237. + +When the gateway persists a user message after a transient provider +failure (429/timeout/auth error), subsequent retries of the same +Telegram message must not stack duplicate user turns in the transcript. +The dedupe guard checks has_platform_message_id before persisting. +""" + +from gateway.session import SessionStore +from hermes_state import SessionDB + + +class TestHasPlatformMessageId: + """SessionDB.has_platform_message_id and SessionStore wrapper.""" + + def _make_db(self, tmp_path): + db = SessionDB(tmp_path / "state.db") + db.create_session("s1", "cli") + return db + + def test_returns_false_when_not_present(self, tmp_path): + db = self._make_db(tmp_path) + assert not db.has_platform_message_id("s1", "msg-999") + + def test_returns_true_after_append(self, tmp_path): + db = self._make_db(tmp_path) + db.append_message( + session_id="s1", + role="user", + content="hello", + platform_message_id="msg-123", + ) + assert db.has_platform_message_id("s1", "msg-123") + + def test_returns_false_for_different_session(self, tmp_path): + db = self._make_db(tmp_path) + db.create_session("s2", "cli") + db.append_message( + session_id="s1", + role="user", + content="hello", + platform_message_id="msg-123", + ) + assert not db.has_platform_message_id("s2", "msg-123") + + def test_session_store_wrapper_returns_false_without_db(self, tmp_path): + store = SessionStore.__new__(SessionStore) + store._db = None + assert not store.has_platform_message_id("s1", "msg-123") + + def test_session_store_wrapper_proxies_to_db(self, tmp_path): + db = self._make_db(tmp_path) + db.append_message( + session_id="s1", + role="user", + content="hello", + platform_message_id="msg-456", + ) + store = SessionStore.__new__(SessionStore) + store._db = db + assert store.has_platform_message_id("s1", "msg-456") + assert not store.has_platform_message_id("s1", "msg-000") + + +class TestDedupeOnTransientFailure: + """The gateway's transient-failure path must not persist duplicates.""" + + @staticmethod + def _make_db(tmp_path): + db = SessionDB(tmp_path / "state.db") + db.create_session("s1", "cli") + return db + + def test_duplicate_message_id_skipped(self, tmp_path): + """When has_platform_message_id returns True, the append is skipped.""" + db = self._make_db(tmp_path) + db.append_message( + session_id="s1", + role="user", + content="hello", + platform_message_id="msg-789", + ) + store = SessionStore.__new__(SessionStore) + store._db = db + + # Simulate a second attempt to persist the same message + assert store.has_platform_message_id("s1", "msg-789") + # The gateway code checks this before calling append_to_transcript, + # so the second append should never fire. + + def test_different_message_id_persists(self, tmp_path): + """A new message_id should always be persisted.""" + db = self._make_db(tmp_path) + db.append_message( + session_id="s1", + role="user", + content="first", + platform_message_id="msg-001", + ) + assert not db.has_platform_message_id("s1", "msg-002") + db.append_message( + session_id="s1", + role="user", + content="second", + platform_message_id="msg-002", + ) + assert db.has_platform_message_id("s1", "msg-002")