fix(gateway): preserve queued follow-up transcript history

Keep the outer history_offset when _run_agent drains queued follow-ups recursively so transcript persistence includes every queued turn in the chain instead of only the last one.
This commit is contained in:
Kong 2026-05-14 02:28:33 +08:00 committed by Teknium
parent 08671d8771
commit 9a815b6c8c
2 changed files with 89 additions and 1 deletions

View file

@ -1139,6 +1139,38 @@ def _should_clear_resume_pending_after_turn(agent_result: dict) -> bool:
return True
def _preserve_queued_followup_history_offset(
current_result: dict,
followup_result: dict,
) -> dict:
"""Carry the outer history offset through queued follow-up drains.
``_process_message_background()`` persists transcript rows only once, after the
entire in-band queued-follow-up chain returns. Each recursive ``_run_agent()``
call advances ``history_offset`` to the history it received, so without
correction the outermost persistence step sees only the *last* queued turn as
"new" and silently drops earlier turns from the same drain chain.
Preserve the earliest (outermost) history offset so the final transcript slice
still includes every queued turn that ran during the chain.
"""
if not isinstance(followup_result, dict):
return followup_result
if not isinstance(current_result, dict):
return followup_result
current_offset = current_result.get("history_offset")
followup_offset = followup_result.get("history_offset")
if not isinstance(current_offset, int):
return followup_result
if isinstance(followup_offset, int) and followup_offset <= current_offset:
return followup_result
merged = dict(followup_result)
merged["history_offset"] = current_offset
return merged
class GatewayRunner:
"""
Main gateway controller.
@ -16042,7 +16074,7 @@ class GatewayRunner:
except Exception:
pass
return await self._run_agent(
followup_result = await self._run_agent(
message=next_message,
context_prompt=context_prompt,
history=updated_history,
@ -16054,6 +16086,7 @@ class GatewayRunner:
event_message_id=next_message_id,
channel_prompt=next_channel_prompt,
)
return _preserve_queued_followup_history_offset(result, followup_result)
finally:
# Stop progress sender, interrupt monitor, and notification task
if progress_task:

View file

@ -14,6 +14,8 @@ to ``_run_agent``'s return dict and uses it for the slice.
import pytest
from gateway.run import _preserve_queued_followup_history_offset
# ---------------------------------------------------------------------------
# Helpers - replicate the filtering logic from _run_agent
@ -265,3 +267,56 @@ class TestTranscriptHistoryOffset:
assert len(fixed_new) == 2
assert fixed_new[0]["content"] == "Now search for dogs"
assert fixed_new[1]["content"] == "Dog results here."
def test_recursive_queued_followup_keeps_outer_history_offset(self):
"""Queued drain persistence must include every turn in the chain.
``_run_agent()`` recurses when a follow-up arrived while the current turn
was running. The recursive call naturally returns a later
``history_offset`` because it received the previous turn as part of its
input history. If the outer caller persists transcript rows using that
later offset, it only sees the *last* queued turn as new and drops the
earlier queued turn from the transcript.
"""
history_before_chain = [
{"role": "user", "content": "Earlier question"},
{"role": "assistant", "content": "Earlier answer"},
]
cool_turn = [
{"role": "user", "content": "cool"},
{"role": "assistant", "content": "Quote again"},
]
order_turn = [
{"role": "user", "content": "how to make order?"},
{"role": "assistant", "content": "Deposit flow"},
]
current_result = {
"history_offset": len(history_before_chain),
"messages": history_before_chain + cool_turn,
}
followup_result = {
"history_offset": len(history_before_chain + cool_turn),
"messages": history_before_chain + cool_turn + order_turn,
}
merged = _preserve_queued_followup_history_offset(
current_result,
followup_result,
)
assert merged["history_offset"] == len(history_before_chain)
persisted = merged["messages"][merged["history_offset"]:]
assert persisted == cool_turn + order_turn
def test_recursive_queued_followup_preserves_smaller_existing_offset(self):
"""Do not widen the slice if the nested result is already conservative."""
current_result = {"history_offset": 4}
followup_result = {"history_offset": 3, "messages": []}
merged = _preserve_queued_followup_history_offset(
current_result,
followup_result,
)
assert merged["history_offset"] == 3