From a942bfd9ccf2e988b9564ea8aa383be95ea83731 Mon Sep 17 00:00:00 2001 From: Kyssta Date: Fri, 12 Jun 2026 10:41:34 +0500 Subject: [PATCH] fix(gateway): reset _last_flushed_db_idx when reusing cached agent (#44327) (#44518) Co-authored-by: kyssta-exe --- gateway/run.py | 5 +++++ tests/gateway/test_agent_cache.py | 32 +++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/gateway/run.py b/gateway/run.py index 817c8441bae..e268bc43531 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -12548,6 +12548,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew if interrupt_depth == 0: agent._last_activity_ts = time.time() agent._last_activity_desc = "starting new turn (cached)" + # Reset the SessionDB flush cursor so the new turn's messages are + # fully persisted — a stale value from the previous turn would + # cause `_flush_messages_to_session_db` to skip new rows (#44327). + if hasattr(agent, "_last_flushed_db_idx"): + agent._last_flushed_db_idx = 0 agent._api_call_count = 0 def _release_evicted_agent_soft(self, agent: Any) -> None: diff --git a/tests/gateway/test_agent_cache.py b/tests/gateway/test_agent_cache.py index 37f8b51a458..350bf216504 100644 --- a/tests/gateway/test_agent_cache.py +++ b/tests/gateway/test_agent_cache.py @@ -1410,6 +1410,38 @@ class TestCachedAgentInactivityReset: assert agent._last_activity_ts == old_ts + def test_fresh_turn_resets_flush_cursor(self): + """interrupt_depth=0: _last_flushed_db_idx resets so new-turn + messages are fully persisted to the session DB (#44327).""" + from gateway.run import GatewayRunner + + agent = self._fake_agent() + agent._last_flushed_db_idx = 42 # stale from previous turn + + with patch("gateway.run.time") as mock_time: + mock_time.time.return_value = _FAKE_NOW + GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=0) + + assert agent._last_flushed_db_idx == 0, ( + "_last_flushed_db_idx must be reset on a fresh turn so that " + "_flush_messages_to_session_db starts from index 0" + ) + + def test_interrupt_turn_preserves_flush_cursor(self): + """interrupt_depth=1: _last_flushed_db_idx preserved so an + in-progress flush is not disrupted by interrupt re-entry.""" + from gateway.run import GatewayRunner + + agent = self._fake_agent() + agent._last_flushed_db_idx = 42 + + GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=1) + + assert agent._last_flushed_db_idx == 42, ( + "_last_flushed_db_idx must not be reset on interrupt-recursive " + "turns — the flush cursor tracks in-progress writes" + ) + def test_api_call_count_reset_regardless_of_depth(self): """_api_call_count is always reset to 0 for the new turn, at any depth.""" from gateway.run import GatewayRunner