From 9a815b6c8ca7080ac01ba04d5f195c52542c7952 Mon Sep 17 00:00:00 2001 From: Kong Date: Thu, 14 May 2026 02:28:33 +0800 Subject: [PATCH] fix(gateway): preserve queued follow-up transcript history Keep the outer history_offset when _run_agent drains queued follow-ups recursively so transcript persistence includes every queued turn in the chain instead of only the last one. --- gateway/run.py | 35 +++++++++++++++- tests/gateway/test_transcript_offset.py | 55 +++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 1 deletion(-) diff --git a/gateway/run.py b/gateway/run.py index 46c508e4bde..4946a7e6c1e 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1139,6 +1139,38 @@ def _should_clear_resume_pending_after_turn(agent_result: dict) -> bool: return True +def _preserve_queued_followup_history_offset( + current_result: dict, + followup_result: dict, +) -> dict: + """Carry the outer history offset through queued follow-up drains. + + ``_process_message_background()`` persists transcript rows only once, after the + entire in-band queued-follow-up chain returns. Each recursive ``_run_agent()`` + call advances ``history_offset`` to the history it received, so without + correction the outermost persistence step sees only the *last* queued turn as + "new" and silently drops earlier turns from the same drain chain. + + Preserve the earliest (outermost) history offset so the final transcript slice + still includes every queued turn that ran during the chain. + """ + if not isinstance(followup_result, dict): + return followup_result + if not isinstance(current_result, dict): + return followup_result + + current_offset = current_result.get("history_offset") + followup_offset = followup_result.get("history_offset") + if not isinstance(current_offset, int): + return followup_result + if isinstance(followup_offset, int) and followup_offset <= current_offset: + return followup_result + + merged = dict(followup_result) + merged["history_offset"] = current_offset + return merged + + class GatewayRunner: """ Main gateway controller. @@ -16042,7 +16074,7 @@ class GatewayRunner: except Exception: pass - return await self._run_agent( + followup_result = await self._run_agent( message=next_message, context_prompt=context_prompt, history=updated_history, @@ -16054,6 +16086,7 @@ class GatewayRunner: event_message_id=next_message_id, channel_prompt=next_channel_prompt, ) + return _preserve_queued_followup_history_offset(result, followup_result) finally: # Stop progress sender, interrupt monitor, and notification task if progress_task: diff --git a/tests/gateway/test_transcript_offset.py b/tests/gateway/test_transcript_offset.py index 27c96ad4b2c..c13e5eb1000 100644 --- a/tests/gateway/test_transcript_offset.py +++ b/tests/gateway/test_transcript_offset.py @@ -14,6 +14,8 @@ to ``_run_agent``'s return dict and uses it for the slice. import pytest +from gateway.run import _preserve_queued_followup_history_offset + # --------------------------------------------------------------------------- # Helpers - replicate the filtering logic from _run_agent @@ -265,3 +267,56 @@ class TestTranscriptHistoryOffset: assert len(fixed_new) == 2 assert fixed_new[0]["content"] == "Now search for dogs" assert fixed_new[1]["content"] == "Dog results here." + + def test_recursive_queued_followup_keeps_outer_history_offset(self): + """Queued drain persistence must include every turn in the chain. + + ``_run_agent()`` recurses when a follow-up arrived while the current turn + was running. The recursive call naturally returns a later + ``history_offset`` because it received the previous turn as part of its + input history. If the outer caller persists transcript rows using that + later offset, it only sees the *last* queued turn as new and drops the + earlier queued turn from the transcript. + """ + history_before_chain = [ + {"role": "user", "content": "Earlier question"}, + {"role": "assistant", "content": "Earlier answer"}, + ] + cool_turn = [ + {"role": "user", "content": "cool"}, + {"role": "assistant", "content": "Quote again"}, + ] + order_turn = [ + {"role": "user", "content": "how to make order?"}, + {"role": "assistant", "content": "Deposit flow"}, + ] + + current_result = { + "history_offset": len(history_before_chain), + "messages": history_before_chain + cool_turn, + } + followup_result = { + "history_offset": len(history_before_chain + cool_turn), + "messages": history_before_chain + cool_turn + order_turn, + } + + merged = _preserve_queued_followup_history_offset( + current_result, + followup_result, + ) + assert merged["history_offset"] == len(history_before_chain) + + persisted = merged["messages"][merged["history_offset"]:] + assert persisted == cool_turn + order_turn + + def test_recursive_queued_followup_preserves_smaller_existing_offset(self): + """Do not widen the slice if the nested result is already conservative.""" + current_result = {"history_offset": 4} + followup_result = {"history_offset": 3, "messages": []} + + merged = _preserve_queued_followup_history_offset( + current_result, + followup_result, + ) + + assert merged["history_offset"] == 3