From ede47a54be046572346a0c9d6de7fc83d8d0ca22 Mon Sep 17 00:00:00 2001 From: karthikeyann Date: Fri, 15 May 2026 16:40:15 -0500 Subject: [PATCH] fix(gateway): pin Telegram DM-topic routing to user's current topic Topic-mode DM replies were fragmenting one conversation across many sessions: a Reply on a message in another topic delivered Telegram's message_thread_id for *that* topic, and #3206's strip routed plain replies to the lobby. Both pulled the user away from their current session. Fix: when topic mode is on, rewrite source.thread_id to the user's most-recent binding if the inbound id is missing/General or not a known topic. Non-topic-mode users unchanged. --- gateway/run.py | 63 ++++++++++++++ hermes_state.py | 21 +++++ tests/gateway/test_telegram_topic_mode.py | 101 ++++++++++++++++++++++ 3 files changed, 185 insertions(+) diff --git a/gateway/run.py b/gateway/run.py index 94170565033..96c173faf3a 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1814,6 +1814,54 @@ class GatewayRunner: session_id=session_entry.session_id, ) + def _recover_telegram_topic_thread_id( + self, + source: SessionSource, + ) -> Optional[str]: + """Pin DM-topic routing to the user's last-active topic. + + Telegram fragments topic-mode DMs two ways: a Reply on a message + in another topic delivers ``message_thread_id`` for *that* topic, + and ``_build_message_event`` strips the thread_id on plain replies + (#3206 — needed for non-topic users). Both route the user to the + wrong session. When topic mode is on, rewrite the thread_id to the + user's most-recent binding if the inbound id is missing/General or + not a known topic for this chat. Returns None to leave it alone. + """ + if ( + source.platform != Platform.TELEGRAM + or source.chat_type != "dm" + or not source.chat_id + or not source.user_id + or not self._telegram_topic_mode_enabled(source) + ): + return None + session_db = getattr(self, "_session_db", None) + if session_db is None: + return None + try: + bindings = session_db.list_telegram_topic_bindings_for_chat( + chat_id=str(source.chat_id), + ) + except Exception: + logger.debug("topic-recover: read failed", exc_info=True) + return None + if not bindings: + return None + inbound = str(source.thread_id or "") + is_lobby = not inbound or inbound in self._TELEGRAM_GENERAL_TOPIC_IDS + known = {str(b.get("thread_id") or "") for b in bindings} + if not is_lobby and inbound in known: + return None + user_id = str(source.user_id) + for b in bindings: # newest-first + if str(b.get("user_id") or "") == user_id: + recovered = str(b.get("thread_id") or "") + if recovered and recovered != inbound: + return recovered + return None + return None + def _resolve_session_agent_runtime( self, *, @@ -7498,6 +7546,21 @@ class GatewayRunner: ) # Get or create session + # Topic-mode DMs: rewrite a stale/foreign thread_id to the user's + # last-active topic so a cross-topic Reply or stripped plain reply + # doesn't fragment the conversation across sessions. + recovered = self._recover_telegram_topic_thread_id(source) + if recovered is not None: + logger.info( + "telegram topic recovery: chat=%s user=%s %r -> %s", + source.chat_id, source.user_id, source.thread_id, recovered, + ) + source = dataclasses.replace(source, thread_id=recovered) + try: + event.source = source + except Exception: + pass + session_entry = self.session_store.get_or_create_session(source) session_key = session_entry.session_key self._cache_session_source(session_key, source) diff --git a/hermes_state.py b/hermes_state.py index 51d9f0b406f..e984e653be2 100644 --- a/hermes_state.py +++ b/hermes_state.py @@ -2831,6 +2831,27 @@ class SessionDB: return None return dict(row) if row else None + def list_telegram_topic_bindings_for_chat( + self, + *, + chat_id: str, + ) -> List[Dict[str, Any]]: + """All Telegram DM topic bindings for one chat, newest first. + + Read-only; returns [] if the bindings table doesn't exist yet + (does not trigger the topic-mode migration). + """ + with self._lock: + try: + rows = self._conn.execute( + "SELECT * FROM telegram_dm_topic_bindings " + "WHERE chat_id = ? ORDER BY updated_at DESC", + (str(chat_id),), + ).fetchall() + except sqlite3.OperationalError: + return [] + return [dict(row) for row in rows] + def bind_telegram_topic( self, *, diff --git a/tests/gateway/test_telegram_topic_mode.py b/tests/gateway/test_telegram_topic_mode.py index eeec2509962..ba37f042678 100644 --- a/tests/gateway/test_telegram_topic_mode.py +++ b/tests/gateway/test_telegram_topic_mode.py @@ -1050,5 +1050,106 @@ async def test_topic_refuses_unauthorized_user(tmp_path, monkeypatch): assert tables == set() +# ────────────────────────────────────────────────────────────────────── +# Cross-topic Reply leak / stripped-reply recovery +# ────────────────────────────────────────────────────────────────────── + + +def _seed_two_topic_bindings(session_db): + """Create two topics for the same user in topic mode, oldest first.""" + session_db.enable_telegram_topic_mode(chat_id="208214988", user_id="208214988") + # Seed two distinct sessions so the bind FK resolves. + session_db.create_session( + session_id="sess-A", + source="telegram", + user_id="208214988", + ) + session_db.create_session( + session_id="sess-B", + source="telegram", + user_id="208214988", + ) + # Old topic A first, then current topic B (so B is "most recent"). + src_a = _make_source(thread_id="111") + session_db.bind_telegram_topic( + chat_id=src_a.chat_id, + thread_id=src_a.thread_id, + user_id=src_a.user_id, + session_key=build_session_key(src_a), + session_id="sess-A", + ) + src_b = _make_source(thread_id="222") + session_db.bind_telegram_topic( + chat_id=src_b.chat_id, + thread_id=src_b.thread_id, + user_id=src_b.user_id, + session_key=build_session_key(src_b), + session_id="sess-B", + ) + + +def test_recover_returns_none_for_known_topic(tmp_path): + db = SessionDB(db_path=tmp_path / "state.db") + _seed_two_topic_bindings(db) + runner = _make_runner(session_db=db) + + assert runner._recover_telegram_topic_thread_id(_make_source(thread_id="222")) is None + + +def test_recover_rewrites_unknown_thread_id_to_most_recent(tmp_path): + # Cross-topic Reply leak: inbound thread_id is a Telegram-only id we never bound. + db = SessionDB(db_path=tmp_path / "state.db") + _seed_two_topic_bindings(db) + runner = _make_runner(session_db=db) + + assert runner._recover_telegram_topic_thread_id(_make_source(thread_id="9999")) == "222" + + +def test_recover_rewrites_lobby_thread_id_to_most_recent(tmp_path): + # Stripped plain reply: thread_id is None, topic mode is on. + db = SessionDB(db_path=tmp_path / "state.db") + _seed_two_topic_bindings(db) + runner = _make_runner(session_db=db) + + assert runner._recover_telegram_topic_thread_id(_make_source(thread_id=None)) == "222" + + +def test_recover_returns_none_when_topic_mode_disabled(tmp_path): + # Non-topic-mode DMs keep the existing strip-to-lobby behavior. + db = SessionDB(db_path=tmp_path / "state.db") + runner = _make_runner(session_db=db) + + assert runner._recover_telegram_topic_thread_id(_make_source(thread_id=None)) is None + + +def test_recover_returns_none_when_no_bindings_yet(tmp_path): + db = SessionDB(db_path=tmp_path / "state.db") + db.enable_telegram_topic_mode(chat_id="208214988", user_id="208214988") + runner = _make_runner(session_db=db) + + assert runner._recover_telegram_topic_thread_id(_make_source(thread_id=None)) is None + + +def test_list_telegram_topic_bindings_for_chat(tmp_path): + db = SessionDB(db_path=tmp_path / "state.db") + _seed_two_topic_bindings(db) + rows = db.list_telegram_topic_bindings_for_chat(chat_id="208214988") + assert [r["thread_id"] for r in rows] == ["222", "111"] + + +def test_list_telegram_topic_bindings_for_chat_no_table(tmp_path): + # Missing topic-mode tables → [] without auto-migrating. + db = SessionDB(db_path=tmp_path / "state.db") + assert db.list_telegram_topic_bindings_for_chat(chat_id="208214988") == [] + tables = { + row[0] + for row in db._conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'telegram_dm%'" + ).fetchall() + } + assert tables == set() + + +