fix(gateway): pin Telegram DM-topic routing to user's current topic

Topic-mode DM replies were fragmenting one conversation across many sessions: a Reply on a message in another topic delivered Telegram's message_thread_id for *that* topic, and #3206's strip routed plain replies to the lobby. Both pulled the user away from their current session. Fix: when topic mode is on, rewrite source.thread_id to the user's most-recent binding if the inbound id is missing/General or not a known topic. Non-topic-mode users unchanged.
This commit is contained in:
karthikeyann 2026-05-15 16:40:15 -05:00 committed by Teknium
parent 470edfa901
commit ede47a54be
3 changed files with 185 additions and 0 deletions

View file

@ -1814,6 +1814,54 @@ class GatewayRunner:
session_id=session_entry.session_id,
)
def _recover_telegram_topic_thread_id(
self,
source: SessionSource,
) -> Optional[str]:
"""Pin DM-topic routing to the user's last-active topic.
Telegram fragments topic-mode DMs two ways: a Reply on a message
in another topic delivers ``message_thread_id`` for *that* topic,
and ``_build_message_event`` strips the thread_id on plain replies
(#3206 — needed for non-topic users). Both route the user to the
wrong session. When topic mode is on, rewrite the thread_id to the
user's most-recent binding if the inbound id is missing/General or
not a known topic for this chat. Returns None to leave it alone.
"""
if (
source.platform != Platform.TELEGRAM
or source.chat_type != "dm"
or not source.chat_id
or not source.user_id
or not self._telegram_topic_mode_enabled(source)
):
return None
session_db = getattr(self, "_session_db", None)
if session_db is None:
return None
try:
bindings = session_db.list_telegram_topic_bindings_for_chat(
chat_id=str(source.chat_id),
)
except Exception:
logger.debug("topic-recover: read failed", exc_info=True)
return None
if not bindings:
return None
inbound = str(source.thread_id or "")
is_lobby = not inbound or inbound in self._TELEGRAM_GENERAL_TOPIC_IDS
known = {str(b.get("thread_id") or "") for b in bindings}
if not is_lobby and inbound in known:
return None
user_id = str(source.user_id)
for b in bindings: # newest-first
if str(b.get("user_id") or "") == user_id:
recovered = str(b.get("thread_id") or "")
if recovered and recovered != inbound:
return recovered
return None
return None
def _resolve_session_agent_runtime(
self,
*,
@ -7498,6 +7546,21 @@ class GatewayRunner:
)
# Get or create session
# Topic-mode DMs: rewrite a stale/foreign thread_id to the user's
# last-active topic so a cross-topic Reply or stripped plain reply
# doesn't fragment the conversation across sessions.
recovered = self._recover_telegram_topic_thread_id(source)
if recovered is not None:
logger.info(
"telegram topic recovery: chat=%s user=%s %r -> %s",
source.chat_id, source.user_id, source.thread_id, recovered,
)
source = dataclasses.replace(source, thread_id=recovered)
try:
event.source = source
except Exception:
pass
session_entry = self.session_store.get_or_create_session(source)
session_key = session_entry.session_key
self._cache_session_source(session_key, source)

View file

@ -2831,6 +2831,27 @@ class SessionDB:
return None
return dict(row) if row else None
def list_telegram_topic_bindings_for_chat(
self,
*,
chat_id: str,
) -> List[Dict[str, Any]]:
"""All Telegram DM topic bindings for one chat, newest first.
Read-only; returns [] if the bindings table doesn't exist yet
(does not trigger the topic-mode migration).
"""
with self._lock:
try:
rows = self._conn.execute(
"SELECT * FROM telegram_dm_topic_bindings "
"WHERE chat_id = ? ORDER BY updated_at DESC",
(str(chat_id),),
).fetchall()
except sqlite3.OperationalError:
return []
return [dict(row) for row in rows]
def bind_telegram_topic(
self,
*,

View file

@ -1050,5 +1050,106 @@ async def test_topic_refuses_unauthorized_user(tmp_path, monkeypatch):
assert tables == set()
# ──────────────────────────────────────────────────────────────────────
# Cross-topic Reply leak / stripped-reply recovery
# ──────────────────────────────────────────────────────────────────────
def _seed_two_topic_bindings(session_db):
"""Create two topics for the same user in topic mode, oldest first."""
session_db.enable_telegram_topic_mode(chat_id="208214988", user_id="208214988")
# Seed two distinct sessions so the bind FK resolves.
session_db.create_session(
session_id="sess-A",
source="telegram",
user_id="208214988",
)
session_db.create_session(
session_id="sess-B",
source="telegram",
user_id="208214988",
)
# Old topic A first, then current topic B (so B is "most recent").
src_a = _make_source(thread_id="111")
session_db.bind_telegram_topic(
chat_id=src_a.chat_id,
thread_id=src_a.thread_id,
user_id=src_a.user_id,
session_key=build_session_key(src_a),
session_id="sess-A",
)
src_b = _make_source(thread_id="222")
session_db.bind_telegram_topic(
chat_id=src_b.chat_id,
thread_id=src_b.thread_id,
user_id=src_b.user_id,
session_key=build_session_key(src_b),
session_id="sess-B",
)
def test_recover_returns_none_for_known_topic(tmp_path):
db = SessionDB(db_path=tmp_path / "state.db")
_seed_two_topic_bindings(db)
runner = _make_runner(session_db=db)
assert runner._recover_telegram_topic_thread_id(_make_source(thread_id="222")) is None
def test_recover_rewrites_unknown_thread_id_to_most_recent(tmp_path):
# Cross-topic Reply leak: inbound thread_id is a Telegram-only id we never bound.
db = SessionDB(db_path=tmp_path / "state.db")
_seed_two_topic_bindings(db)
runner = _make_runner(session_db=db)
assert runner._recover_telegram_topic_thread_id(_make_source(thread_id="9999")) == "222"
def test_recover_rewrites_lobby_thread_id_to_most_recent(tmp_path):
# Stripped plain reply: thread_id is None, topic mode is on.
db = SessionDB(db_path=tmp_path / "state.db")
_seed_two_topic_bindings(db)
runner = _make_runner(session_db=db)
assert runner._recover_telegram_topic_thread_id(_make_source(thread_id=None)) == "222"
def test_recover_returns_none_when_topic_mode_disabled(tmp_path):
# Non-topic-mode DMs keep the existing strip-to-lobby behavior.
db = SessionDB(db_path=tmp_path / "state.db")
runner = _make_runner(session_db=db)
assert runner._recover_telegram_topic_thread_id(_make_source(thread_id=None)) is None
def test_recover_returns_none_when_no_bindings_yet(tmp_path):
db = SessionDB(db_path=tmp_path / "state.db")
db.enable_telegram_topic_mode(chat_id="208214988", user_id="208214988")
runner = _make_runner(session_db=db)
assert runner._recover_telegram_topic_thread_id(_make_source(thread_id=None)) is None
def test_list_telegram_topic_bindings_for_chat(tmp_path):
db = SessionDB(db_path=tmp_path / "state.db")
_seed_two_topic_bindings(db)
rows = db.list_telegram_topic_bindings_for_chat(chat_id="208214988")
assert [r["thread_id"] for r in rows] == ["222", "111"]
def test_list_telegram_topic_bindings_for_chat_no_table(tmp_path):
# Missing topic-mode tables → [] without auto-migrating.
db = SessionDB(db_path=tmp_path / "state.db")
assert db.list_telegram_topic_bindings_for_chat(chat_id="208214988") == []
tables = {
row[0]
for row in db._conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'telegram_dm%'"
).fetchall()
}
assert tables == set()