fix(telegram): harden DM topic binding — persist through switch_session, rebind on /new

Follow-up on @EmelyanenkoK's feat: add Telegram DM topic-mode sessions.

Three issues:

1. Split-brain session state. After get_or_create_session() returned a
   SessionEntry for a topic lane, the handler was mutating
   .session_id in place to the binding's target, but never persisting
   the switch through SessionStore. The sessions.json session_key →
   session_id map kept pointing at the lane's natural id; any reader
   that reloaded from disk saw the wrong id. Fixed by routing through
   SessionStore.switch_session(), which _save()s the mapping and ends
   the old session in SQLite like /resume does.

2. /new inside a topic was a one-message no-op. Reset created a new
   session but left the telegram_dm_topic_bindings row pointing at the
   old session_id, so the next message's binding lookup switched right
   back. Now _handle_reset_command rebinds the topic to the new
   session_id after reset.

3. is_telegram_session_linked_to_topic and
   list_unlinked_telegram_sessions_for_user both called
   apply_telegram_topic_migration() on read, contradicting the PR's
   own invariant that migration only runs on explicit /topic opt-in.
   They now tolerate missing topic tables and return empty/False.

Also: _telegram_topic_mode_enabled() now only treats True as enabled
(not any truthy return), so test fixtures with MagicMock session_db
don't accidentally flip every DM into lobby mode — this was breaking
4 pre-existing test_status_command tests.

Tests:
- New regression: /new inside a topic must update the binding row
  (test_new_inside_telegram_topic_rewrites_binding_to_new_session).
- _make_runner now stubs switch_session so existing restore tests
  still exercise the new code path.

Validated end-to-end with real SessionDB + SessionStore:
readers on fresh DB don't create topic tables; enable creates them;
binding override persists across SessionStore restart; /new rebinds
and the new id survives a restart.

Co-authored-by: EmelyanenkoK <emelyanenko.kirill@gmail.com>
This commit is contained in:
teknium1 2026-05-03 05:34:07 -07:00 committed by Teknium
parent 25065283b3
commit a7683d04a9
4 changed files with 184 additions and 44 deletions

View file

@ -1463,15 +1463,17 @@ class GatewayRunner:
if session_db is None: if session_db is None:
return False return False
try: try:
return bool( raw = session_db.is_telegram_topic_mode_enabled(
session_db.is_telegram_topic_mode_enabled(
chat_id=str(source.chat_id), chat_id=str(source.chat_id),
user_id=str(source.user_id), user_id=str(source.user_id),
) )
)
except Exception: except Exception:
logger.debug("Failed to read Telegram topic mode state", exc_info=True) logger.debug("Failed to read Telegram topic mode state", exc_info=True)
return False return False
# Only honor a real True from the SessionDB. Any other value
# (including MagicMock instances from test fixtures that didn't
# opt into topic mode) means topic mode is off for this chat.
return raw is True
def _is_telegram_topic_root_lobby(self, source: SessionSource) -> bool: def _is_telegram_topic_root_lobby(self, source: SessionSource) -> bool:
"""True for the main Telegram DM when topic mode has made it a lobby.""" """True for the main Telegram DM when topic mode has made it a lobby."""
@ -5902,7 +5904,16 @@ class GatewayRunner:
logger.debug("Failed to read Telegram topic binding", exc_info=True) logger.debug("Failed to read Telegram topic binding", exc_info=True)
binding = None binding = None
if binding: if binding:
session_entry.session_id = str(binding.get("session_id") or session_entry.session_id) bound_session_id = str(binding.get("session_id") or "")
if bound_session_id and bound_session_id != session_entry.session_id:
# Route the override through SessionStore so the session_key
# → session_id mapping is persisted to disk and the previous
# lane session is ended cleanly. Mutating session_entry in
# place here created a split-brain state where the JSON
# index pointed at one id but code downstream used another.
switched = self.session_store.switch_session(session_key, bound_session_id)
if switched is not None:
session_entry = switched
else: else:
try: try:
self._record_telegram_topic_binding(source, session_entry) self._record_telegram_topic_binding(source, session_entry)
@ -7123,6 +7134,17 @@ class GatewayRunner:
_title_note = "\n⚠️ Title is empty after cleanup — session started untitled." _title_note = "\n⚠️ Title is empty after cleanup — session started untitled."
header = header + _title_note header = header + _title_note
# When /new runs inside a Telegram DM topic lane, rewrite the
# (chat_id, thread_id) → session_id binding so the next message
# uses the freshly-created session. Without this, the binding
# still points at the old session and the binding-lookup at the
# top of _handle_message_with_agent would switch right back.
if self._is_telegram_topic_lane(source) and new_entry is not None:
try:
self._record_telegram_topic_binding(source, new_entry)
except Exception:
logger.debug("Failed to rebind Telegram topic after /new", exc_info=True)
# Fire plugin on_session_reset hook (new session guaranteed to exist) # Fire plugin on_session_reset hook (new session guaranteed to exist)
try: try:
from hermes_cli.plugins import invoke_hook as _invoke_hook from hermes_cli.plugins import invoke_hook as _invoke_hook

View file

@ -2350,8 +2350,15 @@ class SessionDB:
self._execute_write(_do) self._execute_write(_do)
def is_telegram_session_linked_to_topic(self, *, session_id: str) -> bool: def is_telegram_session_linked_to_topic(self, *, session_id: str) -> bool:
"""Return True if a Hermes session is already bound to any Telegram DM topic.""" """Return True if a Hermes session is already bound to any Telegram DM topic.
self.apply_telegram_topic_migration()
Read-only: does NOT trigger the telegram-topic migration. If the
topic-mode tables have not been created yet (i.e. nobody has run
``/topic`` in this profile), the session is by definition unbound
and we return False.
"""
with self._lock:
try:
row = self._conn.execute( row = self._conn.execute(
""" """
SELECT 1 FROM telegram_dm_topic_bindings SELECT 1 FROM telegram_dm_topic_bindings
@ -2360,6 +2367,8 @@ class SessionDB:
""", """,
(str(session_id),), (str(session_id),),
).fetchone() ).fetchone()
except sqlite3.OperationalError:
return False
return row is not None return row is not None
def list_unlinked_telegram_sessions_for_user( def list_unlinked_telegram_sessions_for_user(
@ -2369,9 +2378,15 @@ class SessionDB:
user_id: str, user_id: str,
limit: int = 10, limit: int = 10,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
"""List previous Telegram sessions for this user that are not bound to a topic.""" """List previous Telegram sessions for this user that are not bound to a topic.
self.apply_telegram_topic_migration()
Read-only: does NOT trigger the telegram-topic migration. If the
topic-mode tables are absent, fall back to a simpler query that
just returns this user's Telegram sessions — there can't be any
bindings yet.
"""
with self._lock: with self._lock:
try:
rows = self._conn.execute( rows = self._conn.execute(
""" """
SELECT s.*, SELECT s.*,
@ -2398,6 +2413,31 @@ class SessionDB:
""", """,
(str(user_id), int(limit)), (str(user_id), int(limit)),
).fetchall() ).fetchall()
except sqlite3.OperationalError:
# telegram_dm_topic_bindings doesn't exist yet — no bindings
# means every telegram session for this user is "unlinked".
rows = self._conn.execute(
"""
SELECT s.*,
COALESCE(
(SELECT SUBSTR(REPLACE(REPLACE(m.content, X'0A', ' '), X'0D', ' '), 1, 63)
FROM messages m
WHERE m.session_id = s.id AND m.role = 'user' AND m.content IS NOT NULL
ORDER BY m.timestamp, m.id LIMIT 1),
''
) AS _preview_raw,
COALESCE(
(SELECT MAX(m2.timestamp) FROM messages m2 WHERE m2.session_id = s.id),
s.started_at
) AS last_active
FROM sessions s
WHERE s.source = 'telegram'
AND s.user_id = ?
ORDER BY last_active DESC, s.started_at DESC
LIMIT ?
""",
(str(user_id), int(limit)),
).fetchall()
sessions: List[Dict[str, Any]] = [] sessions: List[Dict[str, Any]] = []
for row in rows: for row in rows:

View file

@ -55,6 +55,7 @@ AUTHOR_MAP = {
"14046872+tmimmanuel@users.noreply.github.com": "tmimmanuel", "14046872+tmimmanuel@users.noreply.github.com": "tmimmanuel",
"657290301@qq.com": "IMHaoyan", "657290301@qq.com": "IMHaoyan",
"revar@users.noreply.github.com": "revaraver", "revar@users.noreply.github.com": "revaraver",
"emelyanenko.kirill@gmail.com": "EmelyanenkoK",
# Matrix parity salvage batch (April 2026) # Matrix parity salvage batch (April 2026)
"sr@samirusani": "samrusani", "sr@samirusani": "samrusani",
"angelclaw@AngelMacBook.local": "angel12", "angelclaw@AngelMacBook.local": "angel12",

View file

@ -100,6 +100,21 @@ def _make_runner(session_db=None):
runner.session_store.rewrite_transcript = MagicMock() runner.session_store.rewrite_transcript = MagicMock()
runner.session_store.update_session = MagicMock() runner.session_store.update_session = MagicMock()
runner.session_store.reset_session = MagicMock(return_value=None) runner.session_store.reset_session = MagicMock(return_value=None)
# Default switch_session impl: returns a SessionEntry carrying the target
# session_id. Mirrors SessionStore.switch_session semantics for tests that
# exercise Telegram topic binding rebinds without a real store.
def _switch_session(session_key, target_session_id):
return SessionEntry(
session_key=session_key,
session_id=target_session_id,
created_at=datetime.now(),
updated_at=datetime.now(),
platform=Platform.TELEGRAM,
chat_type="dm",
origin=None,
)
runner.session_store.switch_session = MagicMock(side_effect=_switch_session)
runner._running_agents = {} runner._running_agents = {}
runner._running_agents_ts = {} runner._running_agents_ts = {}
runner._pending_messages = {} runner._pending_messages = {}
@ -366,6 +381,68 @@ async def test_new_inside_telegram_topic_resets_current_topic_with_parallel_tip(
runner.session_store.reset_session.assert_called_once_with(topic_key) runner.session_store.reset_session.assert_called_once_with(topic_key)
@pytest.mark.asyncio
async def test_new_inside_telegram_topic_rewrites_binding_to_new_session(tmp_path, monkeypatch):
"""Regression: /new inside a topic must rewrite the binding table.
Previously /new reset the SessionStore entry but the
telegram_dm_topic_bindings row still pointed at the old session_id;
the next inbound message would look up the stale binding and switch
back to the old session, making /new a no-op.
"""
import gateway.run as gateway_run
session_db = SessionDB(db_path=tmp_path / "state.db")
session_db.enable_telegram_topic_mode(chat_id="208214988", user_id="208214988")
session_db.create_session(
session_id="old-topic-session",
source="telegram",
user_id="208214988",
)
topic_source = _make_source(thread_id="17585")
topic_key = build_session_key(topic_source)
session_db.bind_telegram_topic(
chat_id="208214988",
thread_id="17585",
user_id="208214988",
session_key=topic_key,
session_id="old-topic-session",
)
runner = _make_runner(session_db=session_db)
new_entry = SessionEntry(
session_key=topic_key,
session_id="new-topic-session",
created_at=datetime.now(),
updated_at=datetime.now(),
platform=Platform.TELEGRAM,
chat_type="dm",
origin=topic_source,
)
# Mirror SessionStore.reset_session: in production it calls
# SessionDB.create_session() for the new id before returning, so the
# bindings FK can reference it.
session_db.create_session(
session_id="new-topic-session",
source="telegram",
user_id="208214988",
)
runner.session_store.reset_session.return_value = new_entry
runner._agent_cache_lock = None
monkeypatch.setattr(
gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}
)
await runner._handle_message(_make_event("/new", thread_id="17585"))
binding = session_db.get_telegram_topic_binding(
chat_id="208214988", thread_id="17585",
)
assert binding is not None
assert binding["session_id"] == "new-topic-session"
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_topic_root_command_explicitly_migrates_and_enables_topic_mode(tmp_path, monkeypatch): async def test_topic_root_command_explicitly_migrates_and_enables_topic_mode(tmp_path, monkeypatch):
import gateway.run as gateway_run import gateway.run as gateway_run