diff --git a/gateway/run.py b/gateway/run.py index d26b679b182..99e2d9de9f6 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -16544,6 +16544,37 @@ class GatewayRunner: entry.session_id = agent.session_id self.session_store._save() + # If this is a Telegram DM and source.thread_id was lost during + # the session split (synthetic / recovered event), restore it + # from the binding so _thread_metadata_for_source produces the + # correct message_thread_id instead of routing to the General + # thread. Failure here is non-fatal — we log and continue; + # worst case the message lands in General, which is the + # pre-fix behaviour. + if ( + getattr(source, "platform", None) == Platform.TELEGRAM + and getattr(source, "chat_type", None) == "dm" + and getattr(source, "thread_id", None) is None + and self._session_db is not None + ): + try: + _binding = self._session_db.get_telegram_topic_binding_by_session( + session_id=agent.session_id, + ) + if _binding and _binding.get("thread_id"): + source.thread_id = str(_binding["thread_id"]) + logger.debug( + "Restored source.thread_id=%s from binding after session split %s → %s", + source.thread_id, + session_id, + agent.session_id, + ) + except Exception: + logger.debug( + "Failed to restore thread_id from binding after session split", + exc_info=True, + ) + effective_session_id = getattr(agent, 'session_id', session_id) if agent else session_id # When compression created a new session, the messages list was diff --git a/hermes_state.py b/hermes_state.py index e984e653be2..e8e8947c05a 100644 --- a/hermes_state.py +++ b/hermes_state.py @@ -2852,6 +2852,30 @@ class SessionDB: return [] return [dict(row) for row in rows] + def get_telegram_topic_binding_by_session( + self, + *, + session_id: str, + ) -> Optional[Dict[str, Any]]: + """Return the Telegram DM topic binding for a given session_id, if present. + + Uses the UNIQUE INDEX on telegram_dm_topic_bindings(session_id) for an + efficient reverse lookup. Returns None when the session has no binding or + the table does not exist yet. + """ + with self._lock: + try: + row = self._conn.execute( + """ + SELECT * FROM telegram_dm_topic_bindings + WHERE session_id = ? + """, + (str(session_id),), + ).fetchone() + except sqlite3.OperationalError: + return None + return dict(row) if row else None + def bind_telegram_topic( self, *, diff --git a/tests/gateway/test_telegram_topic_mode.py b/tests/gateway/test_telegram_topic_mode.py index 1a577bfd486..7945fb716b0 100644 --- a/tests/gateway/test_telegram_topic_mode.py +++ b/tests/gateway/test_telegram_topic_mode.py @@ -1229,6 +1229,100 @@ def test_list_telegram_topic_bindings_for_chat_no_table(tmp_path): assert tables == set() +# --------------------------------------------------------------------------- +# Tests for get_telegram_topic_binding_by_session (issue #27166) +# --------------------------------------------------------------------------- + +def test_get_telegram_topic_binding_by_session_returns_binding(tmp_path): + """Reverse lookup by session_id returns the binding row.""" + db = SessionDB(db_path=tmp_path / "state.db") + db.enable_telegram_topic_mode(chat_id="208214988", user_id="208214988") + db.create_session(session_id="sess-27166", source="telegram", user_id="208214988") + db.bind_telegram_topic( + chat_id="208214988", + thread_id="17585", + user_id="208214988", + session_key="agent:main:telegram:dm:208214988:17585", + session_id="sess-27166", + ) + + binding = db.get_telegram_topic_binding_by_session(session_id="sess-27166") + + assert binding is not None + assert binding["chat_id"] == "208214988" + assert binding["thread_id"] == "17585" + assert binding["session_id"] == "sess-27166" +def test_get_telegram_topic_binding_by_session_returns_none_for_unknown(tmp_path): + """Returns None when no binding exists for the given session_id.""" + db = SessionDB(db_path=tmp_path / "state.db") + db.apply_telegram_topic_migration() + result = db.get_telegram_topic_binding_by_session(session_id="nonexistent-sess") + + assert result is None + + +# --------------------------------------------------------------------------- +# Test for session-split thread_id recovery (issue #27166) +# --------------------------------------------------------------------------- + +def test_session_split_restores_source_thread_id_from_binding(tmp_path): + """After a session split, source.thread_id is restored from the binding. + + Simulates the case where context compression creates a new session_id and + source.thread_id is None (synthetic/recovered event). The recovery block + must look up the binding by the new session_id and restore thread_id on + source so that _thread_metadata_for_source returns the correct thread. + """ + from gateway.run import GatewayRunner + from gateway.config import Platform + + db = SessionDB(db_path=tmp_path / "state.db") + db.enable_telegram_topic_mode(chat_id="208214988", user_id="208214988") + db.create_session(session_id="sess-split-new", source="telegram", user_id="208214988") + db.bind_telegram_topic( + chat_id="208214988", + thread_id="17585", + user_id="208214988", + session_key="agent:main:telegram:dm:208214988:17585", + session_id="sess-split-new", + ) + + runner = object.__new__(GatewayRunner) + runner._session_db = db + + # Build a source that looks like it came from a synthetic/recovered event: + # platform and chat_type match a Telegram DM, but thread_id is None. + source = _make_source(thread_id=None) + assert source.platform == Platform.TELEGRAM + assert source.chat_type == "dm" + assert source.thread_id is None + + # Simulate the session-split recovery block logic directly. + if ( + getattr(source, "platform", None) == Platform.TELEGRAM + and getattr(source, "chat_type", None) == "dm" + and getattr(source, "thread_id", None) is None + and runner._session_db is not None + ): + try: + _binding = runner._session_db.get_telegram_topic_binding_by_session( + session_id="sess-split-new", + ) + if _binding and _binding.get("thread_id"): + source.thread_id = str(_binding["thread_id"]) + except Exception: + pass + + assert source.thread_id == "17585", ( + "thread_id must be restored from the binding after session split" + ) + + # Confirm _thread_metadata_for_source now returns non-None. + runner.config = _make_runner(session_db=db).config + runner.adapters = _make_runner(session_db=db).adapters + meta = GatewayRunner._thread_metadata_for_source(runner, source) + assert meta is not None + assert meta["thread_id"] == "17585"