diff --git a/gateway/run.py b/gateway/run.py index f575496e12a..96ed2a388a8 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -2303,6 +2303,32 @@ class GatewayRunner: session_id=session_entry.session_id, ) + def _sync_telegram_topic_binding( + self, + source: SessionSource, + session_entry, + *, + reason: str, + ) -> None: + """Update the topic binding to point at ``session_entry.session_id``. + + Telegram topic lanes persist a (chat_id, thread_id) -> session_id row + so reopening a topic in a fresh process resumes the right Hermes + session. When compression rotates ``session_entry.session_id`` mid-turn, + the binding goes stale and the next inbound message in that topic + reloads the oversized parent transcript instead of the compressed + child, retriggering preflight compression — sometimes in a loop + (#20470, #29712, #33414). + """ + if not self._is_telegram_topic_lane(source): + return + try: + self._record_telegram_topic_binding(source, session_entry) + except Exception: + logger.debug( + "telegram topic binding refresh failed (%s)", reason, exc_info=True, + ) + def _recover_telegram_topic_thread_id( self, source: SessionSource, @@ -8279,6 +8305,28 @@ class GatewayRunner: binding = None if binding: bound_session_id = str(binding.get("session_id") or "") + # Heal bindings that point at a pre-compression parent: walk + # the compression-continuation chain forward to its tip so the + # next message resumes the compressed child instead of + # reloading the oversized parent transcript (#20470/#29712/ + # #33414). Returns the input unchanged when the session isn't + # a compression parent, so this is cheap and safe. + if bound_session_id and self._session_db is not None: + try: + canonical_session_id = self._session_db.get_compression_tip( + bound_session_id, + ) + except Exception: + logger.debug( + "compression-tip lookup failed for %s", + bound_session_id, exc_info=True, + ) + canonical_session_id = bound_session_id + if ( + canonical_session_id + and canonical_session_id != bound_session_id + ): + bound_session_id = canonical_session_id if bound_session_id and bound_session_id != session_entry.session_id: # Route the override through SessionStore so the session_key # → session_id mapping is persisted to disk and the previous @@ -8288,6 +8336,15 @@ class GatewayRunner: switched = self.session_store.switch_session(session_key, bound_session_id) if switched is not None: session_entry = switched + # If the stored binding pointed at a parent, rewrite it to the + # canonical descendant now that we've followed the chain. + if ( + bound_session_id + and bound_session_id != str(binding.get("session_id") or "") + ): + self._sync_telegram_topic_binding( + source, session_entry, reason="compression-tip-walk", + ) else: try: self._record_telegram_topic_binding(source, session_entry) @@ -8664,6 +8721,10 @@ class GatewayRunner: if _hyg_new_sid != session_entry.session_id: session_entry.session_id = _hyg_new_sid self.session_store._save() + self._sync_telegram_topic_binding( + source, session_entry, + reason="hygiene-compression", + ) self.session_store.rewrite_transcript( session_entry.session_id, _compressed @@ -8929,6 +8990,9 @@ class GatewayRunner: if agent_result.get("session_id") and agent_result["session_id"] != session_entry.session_id: session_entry.session_id = agent_result["session_id"] self.session_store._save() + self._sync_telegram_topic_binding( + source, session_entry, reason="agent-result-compression", + ) # Prepend reasoning/thinking if display is enabled (per-platform) try: @@ -12373,6 +12437,9 @@ class GatewayRunner: if new_session_id != session_entry.session_id: session_entry.session_id = new_session_id self.session_store._save() + self._sync_telegram_topic_binding( + source, session_entry, reason="compress-command", + ) self.session_store.rewrite_transcript(new_session_id, compressed) # Reset stored token count — transcript changed, old value is stale diff --git a/tests/gateway/test_telegram_topic_mode.py b/tests/gateway/test_telegram_topic_mode.py index 50f315ba7a8..c887153508c 100644 --- a/tests/gateway/test_telegram_topic_mode.py +++ b/tests/gateway/test_telegram_topic_mode.py @@ -448,6 +448,89 @@ async def test_new_inside_telegram_topic_rewrites_binding_to_new_session(tmp_pat assert binding["session_id"] == "new-topic-session" +@pytest.mark.asyncio +async def test_topic_binding_follows_compression_tip_on_read(tmp_path, monkeypatch): + """Stale topic bindings auto-heal to the compression child on next inbound. + + Regression for #20470 / #29712 / #33414. After compression rotates the + session_id, the binding row still pointed at the parent. On the next + inbound message in that topic, the gateway used to reload the oversized + parent transcript and re-run preflight compression — sometimes in a loop. + The read path now walks ``SessionDB.get_compression_tip()`` and rewrites + the binding to the descendant. + """ + import gateway.run as gateway_run + + session_db = SessionDB(db_path=tmp_path / "state.db") + session_db.enable_telegram_topic_mode(chat_id="208214988", user_id="208214988") + # Build a parent -> compression child chain. end_session sets ended_at; + # create_session sets started_at to "now", so the child's started_at is + # always >= parent's ended_at on a real clock. + session_db.create_session( + session_id="parent-session", source="telegram", user_id="208214988", + ) + session_db.end_session("parent-session", end_reason="compression") + session_db.create_session( + session_id="child-session", + source="telegram", + user_id="208214988", + parent_session_id="parent-session", + ) + topic_source = _make_source(thread_id="17585") + topic_key = build_session_key(topic_source) + # Pre-bug binding: topic still pointed at the pre-compression parent. + session_db.bind_telegram_topic( + chat_id="208214988", + thread_id="17585", + user_id="208214988", + session_key=topic_key, + session_id="parent-session", + ) + + runner = _make_runner(session_db=session_db) + # switch_session() returns a SessionEntry pointing at whatever id was + # requested; capture the requested id for assertion. + switched_to: dict = {} + + def fake_switch(_key, new_session_id): + switched_to["id"] = new_session_id + return SessionEntry( + session_key=topic_key, + session_id=new_session_id, + created_at=datetime.now(), + updated_at=datetime.now(), + platform=Platform.TELEGRAM, + chat_type="dm", + origin=topic_source, + ) + + runner.session_store.switch_session = MagicMock(side_effect=fake_switch) + runner._run_agent = AsyncMock( + return_value={ + "success": True, + "final_response": "ok", + "session_id": "child-session", + "messages": [], + } + ) + + monkeypatch.setattr( + gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"} + ) + + await runner._handle_message(_make_event("follow up after compression", thread_id="17585")) + + # The route was advanced to the compression tip, not the stale parent. + assert switched_to.get("id") == "child-session" + # The binding row was rewritten to point at the descendant so future + # inbound messages skip the tip walk and resolve directly. + refreshed = session_db.get_telegram_topic_binding( + chat_id="208214988", thread_id="17585", + ) + assert refreshed is not None + assert refreshed["session_id"] == "child-session" + + @pytest.mark.asyncio async def test_topic_root_command_explicitly_migrates_and_enables_topic_mode(tmp_path, monkeypatch): import gateway.run as gateway_run