diff --git a/gateway/run.py b/gateway/run.py index a371beb76b4..be4457295e6 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -8722,6 +8722,22 @@ class GatewayRunner: with _lock: self._agent_cache.pop(session_key, None) + @staticmethod + def _init_cached_agent_for_turn(agent: Any, interrupt_depth: int) -> None: + """Reset per-turn state on a cached agent before a new turn starts. + + _last_activity_ts is only reset for fresh external turns (depth 0). + For interrupt-recursive turns the timestamp is preserved so the + inactivity watchdog can accumulate stuck-turn idle time and fire + the 30-min timeout (#15654). The depth-0 reset is still needed: + a session idle for 29 min would otherwise trip the watchdog before + the new turn makes its first API call (#9051). + """ + if interrupt_depth == 0: + agent._last_activity_ts = time.time() + agent._last_activity_desc = "starting new turn (cached)" + agent._api_call_count = 0 + def _release_evicted_agent_soft(self, agent: Any) -> None: """Soft cleanup for cache-evicted agents — preserves session tool state. @@ -9766,12 +9782,7 @@ class GatewayRunner: _cache.move_to_end(session_key) except KeyError: pass - # Reset activity timestamp so the inactivity timeout - # handler doesn't see stale idle time from the previous - # turn and immediately kill this agent. (#9051) - agent._last_activity_ts = time.time() - agent._last_activity_desc = "starting new turn (cached)" - agent._api_call_count = 0 + self._init_cached_agent_for_turn(agent, _interrupt_depth) logger.debug("Reusing cached agent for session %s", session_key) if agent is None: diff --git a/tests/gateway/test_agent_cache.py b/tests/gateway/test_agent_cache.py index d4019e1d5e2..3e3e6c0b93d 100644 --- a/tests/gateway/test_agent_cache.py +++ b/tests/gateway/test_agent_cache.py @@ -1043,3 +1043,104 @@ class TestAgentCacheIdleResume: new_agent.close() except Exception: pass + + +class TestCachedAgentInactivityReset: + """Inactivity-clock reset must be gated on _interrupt_depth == 0. + + On interrupt-recursive turns (_interrupt_depth > 0) the clock must + keep accumulating so the inactivity watchdog can fire when a turn is + stuck in an interrupt loop. Resetting unconditionally prevented the + 30-min timeout from triggering (#15654). The depth-0 reset is still + needed: a session idle for 29 min must not trip the watchdog before + the new turn makes its first API call (#9051). + """ + + def _fake_agent(self, stale_seconds: float = 1800.0): + import time as _t + m = MagicMock() + m._last_activity_ts = _t.time() - stale_seconds + m._api_call_count = 10 + m._last_activity_desc = "previous turn activity" + return m + + def test_fresh_turn_resets_idle_clock(self): + """interrupt_depth=0: clock resets so a post-idle turn gets a + fresh 30-min inactivity window (guard for #9051).""" + import time as _t + from gateway.run import GatewayRunner + + agent = self._fake_agent(stale_seconds=1800.0) + old_ts = agent._last_activity_ts + before = _t.time() + + GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=0) + + assert agent._last_activity_ts >= before, ( + "_last_activity_ts was not reset on a fresh turn (interrupt_depth=0)" + ) + assert agent._last_activity_ts > old_ts, ( + "Stale idle time should be cleared so the new turn gets a fresh window" + ) + + def test_interrupt_turn_preserves_idle_clock(self): + """interrupt_depth=1: clock preserved so accumulated stuck-turn + idle time is not discarded by an interrupt-recursive re-entry (#15654).""" + from gateway.run import GatewayRunner + + agent = self._fake_agent(stale_seconds=1200.0) + old_ts = agent._last_activity_ts + + GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=1) + + assert agent._last_activity_ts == old_ts, ( + "_last_activity_ts must not be reset on interrupt-recursive turns " + "(interrupt_depth>0) — the watchdog needs the accumulated idle time" + ) + + def test_deep_interrupt_recursion_preserves_idle_clock(self): + """interrupt_depth=MAX-1: clock still preserved at any non-zero depth.""" + from gateway.run import GatewayRunner + + agent = self._fake_agent(stale_seconds=600.0) + old_ts = agent._last_activity_ts + + GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=4) + + assert agent._last_activity_ts == old_ts + + def test_api_call_count_reset_regardless_of_depth(self): + """_api_call_count is always reset to 0 for the new turn, at any depth.""" + from gateway.run import GatewayRunner + + agent_fresh = self._fake_agent() + agent_interrupted = self._fake_agent() + + GatewayRunner._init_cached_agent_for_turn(agent_fresh, interrupt_depth=0) + GatewayRunner._init_cached_agent_for_turn(agent_interrupted, interrupt_depth=1) + + assert agent_fresh._api_call_count == 0 + assert agent_interrupted._api_call_count == 0 + + def test_watchdog_accumulation_across_recursive_turns(self): + """Scenario: stuck turn + user interrupt → recursive turn. + + The idle time seen by the watchdog must reflect the full stuck + duration, not restart from zero on the recursive re-entry. + """ + import time as _t + from gateway.run import GatewayRunner + + STUCK_FOR = 1750.0 + agent = self._fake_agent(stale_seconds=STUCK_FOR) + + # Simulate: user sees "Still working..." and sends another message. + # That triggers an interrupt → _run_agent recurses at depth=1. + GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=1) + + # Watchdog sees time.time() - _last_activity_ts ≥ STUCK_FOR. + idle_secs = _t.time() - agent._last_activity_ts + assert idle_secs >= STUCK_FOR - 1.0, ( + f"Watchdog would see {idle_secs:.0f}s idle, expected ~{STUCK_FOR}s. " + "Inactivity timeout could not fire for a stuck interrupted turn." + )